ETL with Spark (Local)

In [6]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [7]:
# Init SparkSession for working
# APP name, can be any name >>> use for logging propose

spark = SparkSession.builder.appName("ETL").getOrCreate()

In [14]:
data_folder = "data"

In [15]:
# Read data files in FOLDER, 2 json files

data_folder = "data"
data = spark.read.option("multiline", "true").json(data_folder)
data.show(3)

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|       type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...| WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|  PushEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------+
only showing top 3 rows



In [16]:
# Test query data by Spark

data.select("id","payload.action").show(3)

+-----------+-------+
|         id| action|
+-----------+-------+
|23487963576|started|
|23487963624|   null|
|23487963529|   null|
+-----------+-------+
only showing top 3 rows



In [17]:
# Show data schema

data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [18]:
# Create temp table

data.createOrReplaceTempView("staging_events")

In [19]:
# Query temp table

spark.sql("""
    select
        id
        , payload.action
    from
        staging_events
""").show(3)

+-----------+-------+
|         id| action|
+-----------+-------+
|23487963576|started|
|23487963624|   null|
|23487963529|   null|
+-----------+-------+
only showing top 3 rows



In [20]:
# Create table 'actors', partition by operation date

table_actor = spark.sql("""
    select
        actor.id
        , actor.avatar_url
        , actor.display_login
        , actor.gravatar_id
        , actor.login
        , actor.url
        , current_date() as date_oprt
    from
        staging_events
""")

# table2.write.partitionBy("year","month","day").mode("overwrite").csv(destination)
table_actor.write.partitionBy("date_oprt").mode("overwrite").option("header",True).csv("actors")

table_actor.createOrReplaceTempView("actors")

In [21]:
# Create table 'repos', partition by operation date

table_repo = spark.sql("""
    select
        repo.id
        , repo.name
        , repo.url
        , current_date() as date_oprt
    from
        staging_events
""")

table_repo.write.partitionBy("date_oprt").mode("overwrite").option("header",True).csv("repos")

table_repo.createOrReplaceTempView("repos")

In [22]:
# Create table 'orgs', partition by operation date

table_org = spark.sql("""
    select
        org.id
        , org.avatar_url
        , org.gravatar_id
        , org.login
        , org.url
        , current_date() as date_oprt
    from
        staging_events
""")

table_org.write.partitionBy("date_oprt").mode("overwrite").option("header",True).csv("orgs")

table_org.createOrReplaceTempView("orgs")

In [23]:
# Create table 'events', partition by operation date

table_event = spark.sql("""
    select
        id
        , created_at
        , public
        , type
        , actor.id as id_actor
        , repo.id as id_repo
        , org.id as id_org
        , current_date() as date_oprt
    from
        staging_events
""")

table_event.write.partitionBy("date_oprt").mode("overwrite").option("header",True).csv("events")

table_event.createOrReplaceTempView("events")

In [25]:
spark.sql("""
    select
        e.created_at
        , a.display_login
        , r.name
        , r.url
        , o.login
    from events e
    inner join actors a
        on a.date_oprt = '2022-11-07' and a.id = e.id_actor
    inner join repos r
        on r.date_oprt = '2022-11-07' and r.id = e.id_repo
    left join orgs o
        on o.date_oprt = '2022-11-07' and o.id = e.id_org
    where
        e.date_oprt = '2022-11-07'
""").show()

+--------------------+--------------+--------------------+--------------------+---------------+
|          created_at| display_login|                name|                 url|          login|
+--------------------+--------------+--------------------+--------------------+---------------+
|2022-08-17T15:52:40Z|    evilgaoshu|spring-projects/s...|https://api.githu...|spring-projects|
|2022-08-17T15:52:40Z|      gurram47|gurram47/AP201100...|https://api.githu...|           null|
|2022-08-17T15:52:40Z|    afbeltranr| afbeltranr/Agrilab2|https://api.githu...|           null|
|2022-08-17T15:52:40Z|      karla-vm|CMSgov/cms-carts-...|https://api.githu...|         CMSgov|
|2022-08-17T15:52:40Z|       hsluoyz|casdoor/casdoor-c...|https://api.githu...|        casdoor|
|2022-08-17T15:52:40Z|       mnw1020|    mnw1020/obsidian|https://api.githu...|           null|
|2022-08-17T15:52:40Z|        ikjo93|ikjo93/Data-Struc...|https://api.githu...|           null|
|2022-08-17T15:52:40Z|       Gabe616|Gab