### Import Library

In [25]:
import os

import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession

**Create Variables for Credentials**

In [26]:
SERVICE_ACCOUNT_EMAIL = "ds525-dw-04-datalake-sa@ds525-dw.iam.gserviceaccount.com"
KEYFILE_PATH = "credentials/ds525-dw-654a5027256c.json"

**Create variables for Directories**

In [27]:
bucket = "gs://ds525-dw-04-datalake-49"
landing_zone = f"{bucket}/landing"
cleaned_zone_csv = f"{bucket}/cleaned_csv"
cleaned_zone_parquet = f"{bucket}/cleaned_parquet"
events_dir = f"{bucket}/events"

**Set OS environment variable with credentials key**

In [28]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = KEYFILE_PATH

### Spark Config

In [29]:
conf = SparkConf()
conf.set("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar")

<pyspark.conf.SparkConf at 0x7f5aaf780790>

In [30]:
conf.set("spark.sql.repl.eagerEval.enabled", True)

<pyspark.conf.SparkConf at 0x7f5aaf780790>

In [31]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Spark for Data Processing

### Test Connection to Google Cloud Service

by read a json file in some bucket

In [32]:
print(f"{landing_zone}")

gs://ds525-dw-04-datalake-49/landing


In [33]:
#Read JSON data from GCS
#df1 = pd.read_json(f"{landing_zone}/github_events_01.json")

In [34]:
# df1.head(10)

### Use Spark to read all files in 'landing_zone' Directory


In [35]:
data = spark.read.option("multiline", "true").json(f"{landing_zone}")

In [36]:
data.show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|                type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|{https://avatars....|2022-08-17T15:53:42Z|23487985360|                NULL|{NULL, c61cf9149b...|  true|{523499555, emine...|           PushEvent|
|{https://avatars....|2022-08-17T15:50:35Z|23487985346|{https://avatars....|{created, NULL, {...|  true|{47574295, cncf/t...|PullRequestReview...|
|{https://avatars....|2022-08-17T15:53:42Z|23487985321|{https://avatars....|{created, NULL, N...|  true|{47574295, cncf/t...|PullRequestReview...|
|{https://avatars....|2022-08-17T15:53:42Z|23487985350|                NULL|{created, NULL, N...|  true|{518116339, Ha

In [37]:
data.printSchema()

root
 |-- actor: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- display_login: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- org: struct (nullable = true)
 |    |-- avatar_url: string (nullable = true)
 |    |-- gravatar_id: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- login: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- before: string (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _links: struct (nullable = true)
 |    |    |    |-- html: struct (nullable = true)
 |    |    |    |    |-- href: string (nullable = true)
 |    |    |    |-- pull_request: struct (nul

In [38]:
data.select("id", "type").show()

+-----------+--------------------+
|         id|                type|
+-----------+--------------------+
|23487985360|           PushEvent|
|23487985346|PullRequestReview...|
|23487985321|PullRequestReview...|
|23487985350|PullRequestReview...|
|23487985373|PullRequestReview...|
|23487985331|PullRequestReview...|
|23487985353|         IssuesEvent|
|23487985271|           PushEvent|
|23487985289|           PushEvent|
|23487985292|          WatchEvent|
|23487985343|          WatchEvent|
|23487985312|         IssuesEvent|
|23487985326|PullRequestReview...|
|23487985276|           PushEvent|
|23487985280|           PushEvent|
|23487985281|           PushEvent|
|23487985272|           PushEvent|
|23487985270|         CreateEvent|
|23487985293|    PullRequestEvent|
|23487985250|PullRequestReview...|
+-----------+--------------------+
only showing top 20 rows



### Create temporary Table for read Dataframe

In [39]:
data.createOrReplaceTempView("staging_events")

In [40]:
table = spark.sql("""
    select
        *

    from
        staging_events
""").show()

+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|               actor|          created_at|         id|                 org|             payload|public|                repo|                type|
+--------------------+--------------------+-----------+--------------------+--------------------+------+--------------------+--------------------+
|{https://avatars....|2022-08-17T15:53:42Z|23487985360|                NULL|{NULL, c61cf9149b...|  true|{523499555, emine...|           PushEvent|
|{https://avatars....|2022-08-17T15:50:35Z|23487985346|{https://avatars....|{created, NULL, {...|  true|{47574295, cncf/t...|PullRequestReview...|
|{https://avatars....|2022-08-17T15:53:42Z|23487985321|{https://avatars....|{created, NULL, N...|  true|{47574295, cncf/t...|PullRequestReview...|
|{https://avatars....|2022-08-17T15:53:42Z|23487985350|                NULL|{created, NULL, N...|  true|{518116339, Ha

### Restructure Tempory Table

In [41]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , to_date(created_at) as date
        , year(created_at) as year
        , actor.login
        , actor.url as actor_url
        , repo.name
        , repo.url as repo_url

    from
        staging_events
""")

In [42]:
table.show()

+-----------+--------------------+--------------------+----------+----+-----------------+--------------------+--------------------+--------------------+
|         id|                type|          created_at|      date|year|            login|           actor_url|                name|            repo_url|
+-----------+--------------------+--------------------+----------+----+-----------------+--------------------+--------------------+--------------------+
|23487985360|           PushEvent|2022-08-17T15:53:42Z|2022-08-17|2022|       emineliyev|https://api.githu...|emineliyev/mtv_pr...|https://api.githu...|
|23487985346|PullRequestReview...|2022-08-17T15:50:35Z|2022-08-17|2022|        by-d-sign|https://api.githu...|            cncf/toc|https://api.githu...|
|23487985321|PullRequestReview...|2022-08-17T15:53:42Z|2022-08-17|2022|        by-d-sign|https://api.githu...|            cncf/toc|https://api.githu...|
|23487985350|PullRequestReview...|2022-08-17T15:53:42Z|2022-08-17|2022|         au

### Save Structured Data im Cleaned Zone


In [43]:
table.write.partitionBy("year").mode("overwrite").csv(cleaned_zone_csv )

In [44]:
table.write.partitionBy("year").mode("overwrite").parquet(cleaned_zone_parquet)

### Create the Events Table

In [45]:
table = spark.sql("""
    select
        id
        , type
        , created_at
        , day(created_at) as day
        , month(created_at) as month
        , year(created_at) as year
        , date(created_at) as date
    from
        staging_events
""")

In [46]:
table.show()

+-----------+--------------------+--------------------+---+-----+----+----------+
|         id|                type|          created_at|day|month|year|      date|
+-----------+--------------------+--------------------+---+-----+----+----------+
|23487985360|           PushEvent|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985346|PullRequestReview...|2022-08-17T15:50:35Z| 17|    8|2022|2022-08-17|
|23487985321|PullRequestReview...|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985350|PullRequestReview...|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985373|PullRequestReview...|2022-08-17T15:24:44Z| 17|    8|2022|2022-08-17|
|23487985331|PullRequestReview...|2022-08-17T15:23:27Z| 17|    8|2022|2022-08-17|
|23487985353|         IssuesEvent|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985271|           PushEvent|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985289|           PushEvent|2022-08-17T15:53:42Z| 17|    8|2022|2022-08-17|
|23487985292|   

In [47]:
table.write.partitionBy("year", "month", "day").mode("overwrite").csv(events_dir)

In [48]:
table.write.partitionBy("date").mode("overwrite").csv(events_dir)

In [None]:
table = spark.sql("""
    select
        actor.login
        , id as event_id
        , actor.url as actor_url
    from
        staging_events
""")
destination = "actors"
table.write.mode("overwrite").csv(events_dir)

In [None]:
table = spark.sql("""
    select
        repo.name
        , id as event_id
        , repo.url as repo_url

    from
        staging_events
""")
destination = "repos"
table.write.mode("overwrite").csv(events_dir)