#Belajar PySpark - Skema pada DataFrame

...

In [2]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=86478410b050b956812746eceb7dc38e666dbe95503cb3dc3cdac2848cf0d4d0
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [38]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [4]:
spark = SparkSession.builder.appName("Belajar PySpark - Skema DataFrame").getOrCreate()

##Infer skema dari data

###Infer dari data eksternal

In [58]:
!wget https://raw.githubusercontent.com/urfie/Seri-Belajar-PySpark/main/dataset/mhs_header.csv

--2023-10-26 13:30:16--  https://raw.githubusercontent.com/urfie/Seri-Belajar-PySpark/main/dataset/mhs_header.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 152 [text/plain]
Saving to: ‘mhs_header.csv’


2023-10-26 13:30:16 (8.07 MB/s) - ‘mhs_header.csv’ saved [152/152]



In [64]:
data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B', 50,100,100]]

df = spark.createDataFrame(data)
df.show()
df.printSchema()

+----+---+---+---+---+
|  _1| _2| _3| _4| _5|
+----+---+---+---+---+
|Agus|  F|100|150|150|
|Budi|  B|200|100|150|
|Dina|  F|150|150|130|
|Dedi|  B| 50|100|100|
+----+---+---+---+---+

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)
 |-- _4: long (nullable = true)
 |-- _5: long (nullable = true)



###Infer dari object python

In [59]:
df_infer = spark.read.csv("mhs_header.csv", header=True, inferSchema=True)
df_infer.printSchema()
df_infer.show()

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus|           F|   100|   150|   150|
|Windy|           F|   200|   150|   180|
| Budi|           B|   200|   100|   150|
| Dina|           F|   150|   150|   130|
| Bayu|           F|    50|   150|   100|
| Dedi|           B|    50|   100|   100|
+-----+------------+------+------+------+



##Mendefinisikan Skema secara programatikal menggunakan `StructType`

In [5]:
data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B', 50,100,100]]

mySchema = StructType([ \
                       StructField('nama', StringType(), True), \
                        StructField('kode_jurusan', StringType(), True), \
                        StructField('nilai1', IntegerType(), True), \
                        StructField('nilai2', IntegerType(), True), \
                        StructField('nilai3', IntegerType(), True) \
                        ])

df = spark.createDataFrame(data,mySchema)
df.show()
df.printSchema()

+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus|           F|   100|   150|   150|
|Budi|           B|   200|   100|   150|
|Dina|           F|   150|   150|   130|
|Dedi|           B|    50|   100|   100|
+----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)



###Menggunakan skema untuk membaca file csv tanpa header

In [55]:
!wget https://raw.githubusercontent.com/urfie/Seri-Belajar-PySpark/main/dataset/mhs.csv

--2023-10-26 13:28:41--  https://raw.githubusercontent.com/urfie/Seri-Belajar-PySpark/main/dataset/mhs.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 113 [text/plain]
Saving to: ‘mhs.csv’


2023-10-26 13:28:41 (2.38 MB/s) - ‘mhs.csv’ saved [113/113]



File csv tanpa header

In [56]:
!cat mhs.csv

Agus,F,100,150,150
Windy,F,200,150,180
Budi,B,200,100,150
Dina,F,150,150,130
Bayu,F,50,150,100
Dedi,B,50,100,100


In [57]:
df = spark.read.csv("mhs.csv", schema=mySchema)
df.printSchema()
df.show()

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

+-----+------------+------+------+------+
| nama|kode_jurusan|nilai1|nilai2|nilai3|
+-----+------------+------+------+------+
| Agus|           F|   100|   150|   150|
|Windy|           F|   200|   150|   180|
| Budi|           B|   200|   100|   150|
| Dina|           F|   150|   150|   130|
| Bayu|           F|    50|   150|   100|
| Dedi|           B|    50|   100|   100|
+-----+------------+------+------+------+



In [45]:
df.schema

StructType([StructField('nama', StringType(), True), StructField('kode_jurusan', StringType(), True), StructField('nilai1', IntegerType(), True), StructField('nilai2', IntegerType(), True), StructField('nilai3', IntegerType(), True)])

##Create Skema dari DDL String

In [48]:
data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B',200,100,100]]

ddlString = "nama STRING, kode_jurusan STRING, \
              nilai1 INT, nilai2 INT, nilai3 INT"

df = spark.createDataFrame(data,ddlString)
df.show()
df.printSchema()

+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus|           F|   100|   150|   150|
|Budi|           B|   200|   100|   150|
|Dina|           F|   150|   150|   130|
|Dedi|           B|   200|   100|   100|
+----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)



In [8]:
df.schema

StructType([StructField('nama', StringType(), True), StructField('kode_jurusan', StringType(), True), StructField('nilai1', IntegerType(), True), StructField('nilai2', IntegerType(), True), StructField('nilai3', IntegerType(), True)])

##Update skema dengan withcolumn

In [9]:
df_fl = df.withColumn("nilai1", df["nilai1"].cast(DecimalType()))
df_fl.printSchema()

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: decimal(10,0) (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)



In [39]:
df_fl = df.withColumns({"nilai1": df["nilai1"].cast(FloatType()),
                "nilai2": df["nilai2"].cast(FloatType()),
                "nilai3": df["nilai3"].cast(FloatType())})
df_fl.printSchema()

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: float (nullable = true)
 |-- nilai2: float (nullable = true)
 |-- nilai3: float (nullable = true)



##Menggunakan File JSON Untuk menyimpan dan membaca skema

###Generate JSON string

In [40]:
json_string = df.schema.json()
print(json_string)

{"fields":[{"metadata":{},"name":"nama","nullable":true,"type":"string"},{"metadata":{},"name":"kode_jurusan","nullable":true,"type":"string"},{"metadata":{},"name":"nilai1","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai2","nullable":true,"type":"integer"},{"metadata":{},"name":"nilai3","nullable":true,"type":"integer"}],"type":"struct"}


###Write to file

In [41]:
text_file = open("schema.json", "w")
text_file.write(json_string)
text_file.close()

###Read JSON file

In [53]:
import json

f = open("schema.json")
json_dict = json.load(f)
f.close()

json_dict

{'fields': [{'metadata': {},
   'name': 'nama',
   'nullable': True,
   'type': 'string'},
  {'metadata': {}, 'name': 'kode_jurusan', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'nilai1', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'nilai2', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'nilai3', 'nullable': True, 'type': 'integer'}],
 'type': 'struct'}

###Menggunakan JSON  Schema untuk definisi Skema DataFrame

In [51]:
data = [['Agus','F',100,150,150],
        ['Budi','B',200,100,150],
        ['Dina','F',150,150,130],
        ['Dedi','B',200,100,100]]

schemaFromJson = StructType.fromJson(json_dict)

df3 = spark.createDataFrame(data, schemaFromJson)
df3.show()
df3.printSchema()

+----+------------+------+------+------+
|nama|kode_jurusan|nilai1|nilai2|nilai3|
+----+------------+------+------+------+
|Agus|           F|   100|   150|   150|
|Budi|           B|   200|   100|   150|
|Dina|           F|   150|   150|   130|
|Dedi|           B|   200|   100|   100|
+----+------------+------+------+------+

root
 |-- nama: string (nullable = true)
 |-- kode_jurusan: string (nullable = true)
 |-- nilai1: integer (nullable = true)
 |-- nilai2: integer (nullable = true)
 |-- nilai3: integer (nullable = true)

