# ETL with Spark (Local)

In [1]:
from pyspark.sql import SparkSession
#from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [2]:
data = "data/github_events_01.json"

In [5]:
# APP name, can be any name >>> use in log and terminal name

spark = SparkSession.builder.appName("ETL").getOrCreate()

In [6]:
# read json in multi-line mode

data = spark.read.option("multiline", "true").json(data)

In [7]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [36]:
data.select("id","type").show()

+-----------+-----------------+
|         id|             type|
+-----------+-----------------+
|23487929637|IssueCommentEvent|
|23487929676|        PushEvent|
|23487929674|        PushEvent|
|23487929661|        PushEvent|
|23487929682|        PushEvent|
|23487929673|        PushEvent|
|23487929588|        PushEvent|
|23487929636|      CreateEvent|
|23487929580|      IssuesEvent|
|23487929591|        PushEvent|
|23487929533|        PushEvent|
|23487929573|        PushEvent|
|23487929349|        PushEvent|
|23487929578|        PushEvent|
|23487929597|IssueCommentEvent|
|23487929522|     ReleaseEvent|
|23487929560|        PushEvent|
|23487929536|        PushEvent|
|23487929501|      DeleteEvent|
|23487929523|        PushEvent|
+-----------+-----------------+
only showing top 20 rows



In [7]:
data.createOrReplaceTempView("staging_events")

In [8]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:51:05Z|23487929637|{https://avatars....|{created, {COLLAB...|  true|{75340147, 350org...|IssueCommentEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+



In [19]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , actor.url
        , org.id as org_id
        
    from
        staging_events
""")

In [26]:
table.show()

+-----------+-----------------+--------------------+----------+----+--------------------+--------+
|         id|             type|          created_at|      date|year|                 url|  org_id|
+-----------+-----------------+--------------------+----------+----+--------------------+--------+
|23487929637|IssueCommentEvent|2022-08-17T15:51:05Z|2022-08-17|2022|https://api.githu...|24305026|
+-----------+-----------------+--------------------+----------+----+--------------------+--------+



In [21]:
output_csv = "output_csv"
output_parquet = "output_parquet"

In [22]:
# Data are writed into folder, not a file

table.write.partitionBy("year").mode("overwrite").csv(output_csv)

In [23]:
table.write.partitionBy("year").mode("overwrite").parquet(output_parquet)

In [27]:
table2 = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , month(created_at) as month
        , day(created_at) as day
        , actor.url
        , org.id as org_id
        
    from
        staging_events
""")
destination = "events"
table2.write.partitionBy("year","month","day").mode("overwrite").csv(destination)

In [30]:
table3 = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , month(created_at) as month
        , day(created_at) as day
        , actor.url
        , org.id as org_id
        
    from
        staging_events
""")
destination = "events_2"
table3.write.partitionBy("date").mode("overwrite").csv(destination)

In [None]:
# Read data files in FOLDER

In [37]:
data_folder = "data"

In [38]:
data = spark.read.option("multiline", "true").json(data_folder)

In [39]:
data.show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|                type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|          WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|         CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|           PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CM