In [1]:
#!pip3 install pandas
#!pip3 install PyArrow
from pyspark.sql.functions import col
import os

In [105]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
from pyspark.sql.window import Window

In [3]:
import time
import json
import requests
import xml.etree.ElementTree as ET
import datetime
import subprocess

In [4]:
if os.path.exists("/etc/hadoop/conf/hive-site.xml"):
    tree = ET.parse("/etc/hadoop/conf/hive-site.xml")
    root = tree.getroot()
    for prop in root.findall("property"):
        if prop.find("name").text == "hive.metastore.warehouse.dir":
            storage = (
                prop.find("value").text.split("/")[0]
                + "//"
                + prop.find("value").text.split("/")[2]
            )

In [5]:
os.environ["STORAGE"] = storage

In [6]:
print(storage)

s3a://go01-demo


In [7]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems",os.environ["STORAGE"])\
    .config("spark.rpc.message.maxSize", "1024")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .getOrCreate()
 #   .config("spark.driver.cores", 4)\
 #   .config("spark.driver.memory", "8g")\

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


In [None]:
#Setup clickthrough (run once)

In [87]:
df = spark.read.option("header","true").parquet(os.environ["STORAGE"]+"/cde-workshop/clickthrough/historical",
                                                    header=True,
                                                    sep=',',
                                                    nullValue='NA')

                                                                                

In [49]:
df.write.format('parquet').mode("overwrite").saveAsTable("default.CLICKTHROUGH")

                                                                                

In [None]:
# CT producer

In [88]:
#spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
df = df.withColumn("converted_date",df['hour'].cast(TimestampType()))\
            .withColumn('day', F.dayofmonth("converted_date"))

In [89]:
yesterday = df.select(F.max(F.col('day'))).collect()[0][0]

                                                                                

In [90]:
today = yesterday + 1

In [91]:
year = "2014"
month = "10"
today_string = "csv_batch_" + year + month + str(today)
print(today_string)

csv_batch_20141026


In [65]:
batch_df = spark.read.option("header","true").csv(os.environ["STORAGE"]+"/cde-workshop/clickthrough/{}".format(today_string),
                                                    header=True,
                                                    sep=',',
                                                    nullValue='NA')

#### STRATEGY 1: Add Last Updated Column and Perform Incremental Merge

In [66]:
from pyspark.sql.functions import lit

In [67]:
batch_df = batch_df.withColumn("LastUpdatedTimestamp", F.to_timestamp(lit(year + month + str(today)), 'yyyyMMdd'))

In [68]:
#Save as View
batch_df.createOrReplaceTempView(today_string)

In [69]:
clickthrough_df = spark.sql("SELECT * FROM default.CLICKTHROUGH").withColumn("LastUpdatedTimestamp", F.to_timestamp(lit(year + month + str(yesterday)), 'yyyyMMdd'))

In [70]:
clickthrough_df

                                                                                

In [73]:
## Insert QUINN ASSERTION HERE

In [71]:
clickthrough_df.dtypes

[('id', 'string'),
 ('click', 'string'),
 ('hour', 'string'),
 ('C1', 'string'),
 ('banner_pos', 'string'),
 ('site_id', 'string'),
 ('site_domain', 'string'),
 ('site_category', 'string'),
 ('app_id', 'string'),
 ('app_domain', 'string'),
 ('app_category', 'string'),
 ('device_id', 'string'),
 ('device_ip', 'string'),
 ('device_model', 'string'),
 ('device_type', 'string'),
 ('device_conn_type', 'string'),
 ('C14', 'string'),
 ('C15', 'string'),
 ('C16', 'string'),
 ('C17', 'string'),
 ('C18', 'string'),
 ('C19', 'string'),
 ('C20', 'string'),
 ('C21', 'string'),
 ('LastUpdatedTimestamp', 'timestamp')]

In [72]:
batch_df.dtypes

[('id', 'string'),
 ('click', 'string'),
 ('hour', 'string'),
 ('C1', 'string'),
 ('banner_pos', 'string'),
 ('site_id', 'string'),
 ('site_domain', 'string'),
 ('site_category', 'string'),
 ('app_id', 'string'),
 ('app_domain', 'string'),
 ('app_category', 'string'),
 ('device_id', 'string'),
 ('device_ip', 'string'),
 ('device_model', 'string'),
 ('device_type', 'string'),
 ('device_conn_type', 'string'),
 ('C14', 'string'),
 ('C15', 'string'),
 ('C16', 'string'),
 ('C17', 'string'),
 ('C18', 'string'),
 ('C19', 'string'),
 ('C20', 'string'),
 ('C21', 'string'),
 ('LastUpdatedTimestamp', 'timestamp')]

In [76]:
SQL_STRING =   """
  SELECT unioned.*
  FROM (
    SELECT * FROM default.CLICKTHROUGH_with_ts x
    UNION ALL
    SELECT * FROM {0} y
  ) unioned
  JOIN
  (
    SELECT
      id,
      max(LastUpdatedTimestamp) as max_date
    FROM (
      SELECT * FROM default.CLICKTHROUGH_with_ts
      UNION ALL
      SELECT * FROM {0}
    ) t
    GROUP BY
      id
  ) grouped
  ON
    unioned.id = grouped.id AND
    unioned.LastUpdatedTimestamp = grouped.max_date
  """.format(today_string)

In [79]:
union_df = spark.sql(SQL_STRING)

In [81]:
union_df.write.format('parquet').mode("overwrite").saveAsTable("default.CLICKTHROUGH_{}".format(today_string))

                                                                                

In [111]:
union_df.count()

                                                                                

23609479

22/10/26 04:45:24 490 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /100.100.16.77:52102 is closed


#### STRATEGY 2: Incremental Merge via Union and Repartition

In [92]:
batch_df = spark.read.option("header","true").csv(os.environ["STORAGE"]+"/cde-workshop/clickthrough/{}".format(today_string),
                                                    header=True,
                                                    sep=',',
                                                    nullValue='NA')

In [93]:
clickthrough_df = spark.sql("SELECT * FROM default.CLICKTHROUGH")

In [97]:
#batch_df.dtypes

In [98]:
#clickthrough_df.dtypes

In [99]:
df_merge = batch_df.unionAll(clickthrough_df)

In [101]:
df_merge = df_merge.withColumn("converted_date",df_merge['hour'].cast(TimestampType()))

In [106]:
df_merge = df_merge.withColumn("_row_number", F.row_number().over(Window.partitionBy(df_merge['id']).orderBy('converted_date')))

In [107]:
df_merge = df_merge.where(df_merge._row_number == 1).drop("_row_number")

In [110]:
df_merge.count()

                                                                                

23609479