SQL adalah salah satu bahasa populer untuk pemrosesan dan analisis data. Spark mendukung SQL untuk memproses DataFrame. Dalam latihan ini kita akan menggunakan spark SQL tanpa database.


In [None]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
from pyspark.sql import SparkSession

Inisialisasi spark session untuk berinteraksi dengan Spark cluster.

In [None]:
spark = SparkSession.builder.appName('DataFrame Basics').getOrCreate()

Download dataset

In [None]:
!wget https://github.com/urfie/SparkSQL-dengan-Hive/raw/main/datasets/application_record_header.csv.gz

Load ke dataframe

In [None]:
df = spark.read.csv("application_record_header.csv.gz", header=True, inferSchema=True)

Sebelum menggunakan SQL, kita perlu membuat temporary table dari dataframe yang akan kita olah.

Gunakan fungsi `createOrReplaceTempView(nama_tabel)` pada dataframe tersebut.

In [None]:
df.createOrReplaceTempView("app_record")

Selanjutnya kita bisa menggunakan nama tabel yang sudah kita definisikan dalam SQL statement.

Untuk mengeksekusi SQL statement, kita gunakan fungsi `sql(sqlstatement)` pada spark session.

In [None]:
spark.sql("select count(*) from app_record").show()

In [None]:
spark.sql("select * from app_record limit 5").show()

Kita akan melakukan join salah satu kolom dengan data referensi dan melakukan agregasi. Sebelumnya kita buat dataframe referensi dan membuat temporary viewnya

In [None]:
mydata = (
    ('Lower secondary',1),
    ('Secondary / secondary special',2),
    ('Academic degree',3),
    ('Incomplete higher',4),
    ('Higher education',5))

ref_edu = spark.createDataFrame(mydata).toDF("NAME_EDUCATION_TYPE", "EDU_LEVEL")
ref_edu.createOrReplaceTempView("ref_edu")
spark.sql("select * from ref_edu").show()

Kita lakukan join, agregat, kemudian kita simpan ke tabel

In [None]:
spark.sql("""SELECT edu_level, count(1) as number_of_app FROM
              (SELECT ref_edu.EDU_LEVEL as edu_level
                FROM app_record LEFT JOIN ref_edu
                ON app_record.NAME_EDUCATION_TYPE=ref_edu.NAME_EDUCATION_TYPE)
             GROUP BY edu_level SORT BY edu_level""").write.saveAsTable(name="aggregated_edu", mode="overwrite")

Kita tampilkan hasilnya dengan menggunakan perintah `describe formatted`

In [None]:
spark.sql("describe formatted aggregated_edu").show(truncate = False)

In [None]:
%ls -l /content/spark-warehouse/aggregated_edu/