# ETL with Spark (Local)

In [1]:
from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, TimestampType

# import pyspark.sql.functions as F

In [72]:
import pandas as pd
import glob

In [75]:
p = glob.glob("data/*.json")

In [77]:
p

['data/github_events_01.json', 'data/github_events_02.json']

In [76]:
pd.read_json(p)

ValueError: Invalid file path or buffer object type: <class 'list'>

In [2]:
data = "github_events_01.json"

In [56]:
data_2 = "github_events_02.json"

In [3]:
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

In [64]:
data_folder = "data"

In [65]:
data = spark.read.option("multiline", "true").json(data_folder)

In [62]:
# data = spark.read.option("multiline", "true").json(data_2)

In [66]:
data.show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|                type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|{https://avatars....|2022-08-17T15:52:40Z|23487963576|{https://avatars....|{started, null, n...|  true|{6296790, spring-...|          WatchEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963624|                null|{null, null, null...|  true|{525860969, gurra...|         CreateEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963529|                null|{null, e80c84c7bb...|  true|{350706029, afbel...|           PushEvent|
|{https://avatars....|2022-08-17T15:52:40Z|23487963558|{https://avatars....|{created, null, {...|  true|{226399669, CM

In [58]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- author_association: string (nullable = true)
 |    |    |-- body: string (nullable = true)
 |    |    |-- created_at: string (nullable = true)
 |    |    |-- html_url: string (nullable 

In [6]:
data.select("id", "type").show()

+-----------+-----------------+
|         id|             type|
+-----------+-----------------+
|23487929637|IssueCommentEvent|
+-----------+-----------------+



In [7]:
data.createOrReplaceTempView("staging_events")

In [8]:
table = spark.sql("""
    select
        *
        
    from
        staging_events
""").show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|             type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+
|{https://avatars....|2022-08-17T15:51:05Z|23487929637|{https://avatars....|{created, {COLLAB...|  true|{75340147, 350org...|IssueCommentEvent|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+-----------------+



In [24]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , actor.login
        , actor.url as actor_url
        , repo.name
        , repo.url as repo_url
        
    from
        staging_events
""")

In [25]:
table.show()

+-----------+-----------------+--------------------+----------+----+-------+--------------------+-----------------+--------------------+
|         id|             type|          created_at|      date|year|  login|           actor_url|             name|            repo_url|
+-----------+-----------------+--------------------+----------+----+-------+--------------------+-----------------+--------------------+
|23487929637|IssueCommentEvent|2022-08-17T15:51:05Z|2022-08-17|2022|sukhada|https://api.githu...|350org/ak_intl_v3|https://api.githu...|
+-----------+-----------------+--------------------+----------+----+-------+--------------------+-----------------+--------------------+



In [26]:
output_csv = "output_csv"
output_parquet = "output_parquet"

In [27]:
table.write.partitionBy("year").mode("overwrite").csv(output_csv)

In [28]:
table.write.partitionBy("year").mode("overwrite").parquet(output_parquet)

In [53]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
    from
        staging_events
""")

In [54]:
table.show()

+-----------+-----------------+--------------------+---+-----+----+----------+
|         id|             type|          created_at|day|month|year|      date|
+-----------+-----------------+--------------------+---+-----+----+----------+
|23487929637|IssueCommentEvent|2022-08-17T15:51:05Z| 17|    8|2022|2022-08-17|
+-----------+-----------------+--------------------+---+-----+----+----------+



In [50]:
destination = "events"

In [52]:
table.write.partitionBy("year", "month", "day").mode("overwrite").csv(destination)

In [55]:
table.write.partitionBy("date").mode("overwrite").csv(destination)

In [37]:
table = spark.sql("""
    select
        actor.login
        , id as event_id
        , actor.url as actor_url
    from
        staging_events
""")
destination = "actors"
table.write.mode("overwrite").csv(destination)

In [38]:
table = spark.sql("""
    select
        repo.name
        , id as event_id
        , repo.url as repo_url
        
    from
        staging_events
""")
destination = "repos"
table.write.mode("overwrite").csv(destination)