# Doing the samething but with sparkSQL

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('spark-read-from-bigquery') \
                    .config('parentProject', 'khung-playground') \
                    .config('spark.jars', '../spark-bigquery-latest_2.12.jar') \
                    .config("credentialsFile", "../khung-playground-cb7110dd8c95.json").getOrCreate()


In [2]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def explode_nested_type(df: DataFrame, data_type = "array"):
    
    normal_column = []
    to_be_explode_column = []
    explode = []

    for column in df.dtypes:
        if column[1].startswith(data_type):
            to_be_explode_column.append(column)
        else:
            normal_column.append(column[0])

    # explode the column
    for column in to_be_explode_column:
        
        if data_type == "array":
            explode.append(F.explode(df[column[0]]).alias(column[0]))
        elif data_type == "struct":
            explode.append(F.col(f"{column[0]}.*"))

    # put all back together
    df = df.select(
        *normal_column,
        *explode
    )
    
    return df

In [3]:
TABLE_LANGUAGE = "khung-playground.github.languages"
TABLE_COMMIT = "khung-playground.github.commits"
LANGUAGE = "Python"

### Language Dataset

In [4]:
# Creating DataFrames for language
df_language = spark.read.format('bigquery').option('table', TABLE_LANGUAGE).load()

In [5]:
df_language = explode_nested_type(df_language)
df_language = explode_nested_type(df_language, "struct")

In [6]:
df_language.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- bytes: long (nullable = true)



In [7]:
df_language.createOrReplaceTempView("language")

In [36]:
df_language = spark.sql(f'''
    SELECT repo_name, name, bytes
    FROM language 
    WHERE name == "{LANGUAGE}"
    GROUP BY repo_name, name, bytes
''')
df_language.createOrReplaceTempView("language_dedup")

#### Commit Dataset

In [26]:
df_commit = spark.read.format('bigquery').option('table', TABLE_COMMIT).load()
df_commit.createOrReplaceTempView("commit")
df_commit = spark.sql("""
    SELECT commit, committer, repo_name
    FROM commit
""")

In [31]:
df_commit = explode_nested_type(df_commit, "struct")
df_commit.createOrReplaceTempView("commit_unnested")

In [32]:
df_commit.printSchema()

root
 |-- commit: string (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- time_sec: long (nullable = true)
 |-- tz_offset: long (nullable = true)
 |-- date: timestamp (nullable = true)



In [33]:
df_commit = spark.sql("""
    SELECT commit,
           repo_name,
           name as commiter_name,
           email,
           time_sec,
           tz_offset,
           date
    FROM commit_unnested
""")

#### I aware there are records of commits are at exactally same time and same commiter, In my asscept this doesn't make sense so I see this as duplicate hence drop it.

#### can not find dropDuplicates builtin finction in sparkSQL

In [35]:
duplicate_data = ["commiter_name", "email", "time_sec", "tz_offset", "date", "repo_name"]
df_commit = df_commit.dropDuplicates(duplicate_data)
df_commit.createOrReplaceTempView("commit_unnested_renamed")

#### Join df_language with df_commit

In [53]:
df_commit_language = spark.sql("""
    SELECT 
        cur.commiter_name,
        cur.commit,
        cur.email,
        cur.time_sec,
        cur.tz_offset,
        cur.date,
        cur.repo_name,
        ld.name, 
        ld.bytes
    FROM commit_unnested_renamed as cur
    JOIN language_dedup as ld ON cur.repo_name == ld.repo_name
""")
df_commit_language.createOrReplaceTempView("commit_language_joined")

In [55]:
test = spark.sql("select * from commit_language_joined limit 10")

In [56]:
test.printSchema()

root
 |-- commiter_name: string (nullable = true)
 |-- commit: string (nullable = true)
 |-- email: string (nullable = true)
 |-- time_sec: long (nullable = true)
 |-- tz_offset: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- bytes: long (nullable = true)



#### Window function on the merged data in order to calculate time distribution of two commits

In [59]:
df_commit_language_windowed = spark.sql("""
    select
        commiter_name,
        repo_name,
        commit,
        email,
        time_sec,
        tz_offset,
        date,
        name,
        bytes,
        row_number() over(PARTITION BY repo_name order by date) as commit_seq,
        lag(time_sec) over(PARTITION BY repo_name order by date) as lag_time_sec
    from commit_language_joined
""")
df_commit_language_windowed.createOrReplaceTempView("commit_language_windowed")

In [60]:
df_commit_language_windowed = spark.sql("""
    select
        *,
        time_sec - lag_time_sec as prv_time_diff
    from commit_language_windowed
""")

In [61]:
df_commit_language_windowed.printSchema()

root
 |-- commiter_name: string (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- commit: string (nullable = true)
 |-- email: string (nullable = true)
 |-- time_sec: long (nullable = true)
 |-- tz_offset: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- name: string (nullable = true)
 |-- bytes: long (nullable = true)
 |-- commit_seq: integer (nullable = false)
 |-- lag_time_sec: long (nullable = true)
 |-- prv_time_diff: long (nullable = true)



In [62]:
df_commit_language_windowed.show()

+------------------+----------------+--------------------+--------------------+----------+---------+-------------------+------+-----+----------+------------+-------------+
|     commiter_name|       repo_name|              commit|               email|  time_sec|tz_offset|               date|  name|bytes|commit_seq|lag_time_sec|prv_time_diff|
+------------------+----------------+--------------------+--------------------+----------+---------+-------------------+------+-----+----------+------------+-------------+
|       Erich Gamma|Microsoft/vscode|8f35cc4768393b254...|8c10e1560732a7a60...|1447421978|       60|2015-11-13 13:39:38|Python| 2405|         1|        null|         null|
|        Chris Dias|Microsoft/vscode|6f9e2ae3907632e2f...|711c73f64afdce07b...|1447426118|       60|2015-11-13 14:48:38|Python| 2405|         2|  1447421978|         4140|
|   Benjamin Pasero|Microsoft/vscode|0a2f0cbc5c7ebc457...|d377d17da589177b3...|1447428762|       60|2015-11-13 15:32:42|Python| 2405|       