In [1]:
#!pip3 install pandas
#!pip3 install PyArrow
from pyspark.sql.functions import col

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

In [3]:
import os
import time
import json
import requests
import xml.etree.ElementTree as ET
import datetime
import subprocess

In [4]:
if os.path.exists("/etc/hadoop/conf/hive-site.xml"):
    tree = ET.parse("/etc/hadoop/conf/hive-site.xml")
    root = tree.getroot()
    for prop in root.findall("property"):
        if prop.find("name").text == "hive.metastore.warehouse.dir":
            storage = (
                prop.find("value").text.split("/")[0]
                + "//"
                + prop.find("value").text.split("/")[2]
            )

In [5]:
os.environ["STORAGE"] = storage

In [6]:
print(storage)

s3a://go01-demo


In [30]:
spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems",os.environ["STORAGE"])\
    .config("spark.rpc.message.maxSize", "1024")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .getOrCreate()
 #   .config("spark.driver.cores", 4)\
 #   .config("spark.driver.memory", "8g")\
    

In [157]:
customers_df = spark.read.csv(os.environ["STORAGE"]+'/cde-workshop/clickthrough/customers/data', header=True)

In [158]:
#customers_df.select("customer_id").show()

In [10]:
!pip3 install faker



In [159]:
customers_df.dtypes

[('customer_id', 'string'),
 ('username', 'string'),
 ('name', 'string'),
 ('gender', 'string'),
 ('email', 'string'),
 ('occupation', 'string'),
 ('birthdate', 'string'),
 ('address', 'string'),
 ('device_id', 'string')]

In [12]:
max_current_cust_id = int(customers_df.select(F.max("customer_id")).collect()[0]['max(customer_id)'])

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [14]:
from faker import Faker
fake = Faker(seed=3)

In [15]:
!pip3 install pandas
import pandas as pd



In [16]:
appends = []
for i in range(5):
    appends.append(fake.profile())

In [17]:
appends_df = pd.DataFrame(appends)[["username", "name", "sex", "mail", "job", "birthdate", "address"]]

In [18]:
import random
import numpy as np

In [19]:
def make_batch_df(max_current_cust_id):
    
    batch_size = random.randint(100, 1000)
    appends = []
    
    for i in range(batch_size):
        appends.append(fake.profile())
        
    appends_df = pd.DataFrame(appends)[["username", "name", "sex", "mail", "job", "birthdate", "address"]]
    appends_df['customer_id'] = list(range(max_current_cust_id, max_current_cust_id+batch_size))
    
    appends_df.rename({'sex': 'gender', 'job': 'position', 'mail':'email'}, axis=1)
    
    return appends_df

In [20]:
make_batch_df(9999)

Unnamed: 0,username,name,sex,mail,job,birthdate,address,customer_id
0,mavila,Rhonda Johnson,F,johnwilliams@yahoo.com,"Therapist, drama",2000-09-12,Unit 3313 Box 5140\nDPO AA 02879,9999
1,kevin36,Carmen Griffin,F,dalemullins@yahoo.com,Trade mark attorney,1946-04-15,"991 Lauren Fort\nMossville, WA 57733",10000
2,jpineda,Allen Torres,M,kevinjenkins@gmail.com,Water engineer,1972-09-09,"3400 Charles Plain Apt. 285\nMorganbury, SC 04316",10001
3,krystalellis,Barbara Mitchell,F,jamesrios@gmail.com,Soil scientist,1908-02-10,"2788 Castillo Fall\nRebeccaview, HI 20055",10002
4,hancockrichard,George Parker,M,imooney@yahoo.com,"Scientist, research (maths)",1996-03-03,"1549 Conway Valleys\nSouth Johnfort, AL 43825",10003
...,...,...,...,...,...,...,...,...
354,qrice,Robert Hernandez,M,brandon09@gmail.com,Waste management officer,1912-05-13,"2263 Tina Springs\nNew Melissa, IN 52637",10353
355,djones,Kristi Henderson,F,isabelhowe@gmail.com,Heritage manager,1911-02-26,"429 Desiree Stream\nNew Jessicashire, SD 68761",10354
356,williamsjustin,Jeffrey Anderson,M,jcordova@yahoo.com,Translator,1937-01-13,"43439 Hodge Motorway\nMillerstad, NC 70033",10355
357,nsilva,Jennifer Jackson,F,ifoley@hotmail.com,Dietitian,1918-05-27,"5876 Brittany Cove\nNguyenbury, DE 38422",10356


In [37]:
ct_hist_df = spark.read.option("header","true").parquet("s3a://go01-demo/cde-workshop/clickthrough/historical")

                                                                                

In [86]:
hist_ded_ids = ct_hist_df.select("device_id").sample(.01).toPandas()

                                                                                

In [87]:
unique_arr = np.unique(hist_ded_ids.device_id)

In [88]:
unique_arr

array(['000032d7', '000070cc', '00038618', ..., 'ffff60f9', 'ffff9249',
       'ffffe321'], dtype=object)

In [90]:
hist_ded_ids = unique_arr[0:10000]
len(hist_ded_ids)

10000

In [32]:
customers_pd_df = customers_df.toPandas()

In [46]:
hist_ded_ids.count()

device_id    10000
dtype: int64

In [95]:
#customers_pd_df = customers_pd_df.drop(["devide_id"])
customers_pd_df["device_id"] = hist_ded_ids

In [98]:
customers_pd_df = customers_pd_df.drop("devide_id", axis=1)

In [100]:
final_spark_df_customers = spark.createDataFrame(customers_pd_df)

In [101]:
final_spark_df_customers.write.mode("overwrite").csv(os.environ["STORAGE"]+'/cde-workshop/clickthrough/customers/data', header=True)

                                                                                

In [102]:
verify_df = spark.read.csv(os.environ["STORAGE"]+'/cde-workshop/clickthrough/customers/data', header=True)

                                                                                

In [103]:
verify_df.show()

+-----------+--------------+--------------------+------+--------------------+--------------------+----------+--------------------+---------+
|customer_id|      username|                name|gender|               email|          occupation| birthdate|             address|device_id|
+-----------+--------------+--------------------+------+--------------------+--------------------+----------+--------------------+---------+
|          1|       robin48|       Jesse Spencer|     M|   udalton@yahoo.com|Pharmacist, commu...|1975-09-24|10305 Scott River...| 000032d7|
|          2|cynthiajackson|     Savannah Daniel|     F|walkerchristopher...|        Set designer|1934-09-28|70884 Andrew Plaz...| 000070cc|
|          3|       ydurham|     Alexander Davis|     M|annlindsey@yahoo.com|Plant breeder/gen...|1975-11-09|0365 Carrie Point...| 00038618|
|          4| murphymichael|      Patrick Cortez|     M| freeves@hotmail.com|Scientist, audiol...|1911-01-20|9864 Brian Walk S...| 000727b1|
|          5|

In [104]:
hist_ded_ids = ct_hist_df.select("device_id").sample(.01).toPandas()

                                                                                

In [105]:
unique_arr2 = np.unique(hist_ded_ids.device_id)

In [107]:
sample_device_ids = [i for i in unique_arr2 if i not in unique_arr]

In [108]:
len(sample_device_ids)

29261

In [113]:
array_df = pd.DataFrame(sample_device_ids, columns=['device_id'])
device_id_spark_df = spark.createDataFrame(array_df)

In [114]:
device_id_spark_df.write.csv(os.environ["STORAGE"]+'/cde-workshop/clickthrough/customers/data/device_ids', header=True)

                                                                                

In [115]:
def make_batch_df(max_current_cust_id):
    
    batch_size = random.randint(100, 1000)
    appends = []
    
    for i in range(batch_size):
        appends.append(fake.profile())
        
    appends_df = pd.DataFrame(appends)[["username", "name", "sex", "mail", "job", "birthdate", "address"]]
    appends_df['customer_id'] = list(range(max_current_cust_id, max_current_cust_id+batch_size))
    
    appends_df.rename({'sex': 'gender', 'job': 'position', 'mail':'email'}, axis=1)
    
    return appends_df

In [121]:
new_batch_pd_df = make_batch_df(max_current_cust_id)
new_batch_pd_df_count = new_batch_pd_df.count()

In [122]:
new_batch_spark_df = spark.createDataFrame(new_batch_pd_df)

In [139]:
new_batch_len = new_batch_pd_df_count[0].item()

In [125]:
in_device_id_spark_df = spark.read.csv(os.environ["STORAGE"]+'/cde-workshop/clickthrough/customers/data/device_ids', header=True)

                                                                                

In [142]:
in_device_slice = in_device_id_spark_df.limit(new_batch_len)

In [148]:
from pyspark.sql.functions import monotonically_increasing_id

In [149]:
in_device_slice = in_device_slice.withColumn("mono_id",monotonically_increasing_id() )

In [150]:
new_batch_spark_df = new_batch_spark_df.withColumn("mono_id",monotonically_increasing_id() )

In [154]:
new_batch_spark_append_df = new_batch_spark_df.join(in_device_slice, 'mono_id').drop('mono_id')

In [156]:
new_batch_spark_append_df.dtypes

[('username', 'string'),
 ('name', 'string'),
 ('sex', 'string'),
 ('mail', 'string'),
 ('job', 'string'),
 ('birthdate', 'date'),
 ('address', 'string'),
 ('customer_id', 'bigint'),
 ('device_id', 'string')]

In [155]:
customers_df.dtypes

[('customer_id', 'string'),
 ('username', 'string'),
 ('name', 'string'),
 ('gender', 'string'),
 ('email', 'string'),
 ('occupation', 'string'),
 ('birthdate', 'string'),
 ('address', 'string')]

In [163]:
customers_df.write.mode("overwrite").saveAsTable('default.CUSTOMERS')

Py4JJavaError: An error occurred while calling o517.saveAsTable.
: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Error in loading storage handler.org.apache.hadoop.hive.kudu.KuduStorageHandler
	at org.apache.hadoop.hive.ql.metadata.Table.getStorageHandler(Table.java:347)
	at org.apache.spark.sql.hive.client.HiveClientImpl.convertHiveTableToCatalogTable(HiveClientImpl.scala:486)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$3(HiveClientImpl.scala:445)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getTableOption$1(HiveClientImpl.scala:445)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:319)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:246)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:245)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:299)
	at org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:443)
	at org.apache.spark.sql.hive.client.HiveClient.getTable(HiveClient.scala:90)
	at org.apache.spark.sql.hive.client.HiveClient.getTable$(HiveClient.scala:89)
	at org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:92)
	at org.apache.spark.sql.hive.HiveExternalCatalog.getRawTable(HiveExternalCatalog.scala:123)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$getTable$1(HiveExternalCatalog.scala:714)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
	at org.apache.spark.sql.hive.HiveExternalCatalog.getTable(HiveExternalCatalog.scala:714)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.getTable(ExternalCatalogWithListener.scala:138)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:515)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:500)
	at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.loadTable(V2SessionCatalog.scala:65)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:281)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$lzycompute$1(Analyzer.scala:1250)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$1(Analyzer.scala:1250)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1287)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1286)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1204)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1167)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1167)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1133)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:197)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:197)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
	at org.apache.spark.sql.SparkSession.table(SparkSession.scala:600)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:649)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:565)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Error in loading storage handler.org.apache.hadoop.hive.kudu.KuduStorageHandler
	at org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(HiveUtils.java:303)
	at org.apache.hadoop.hive.ql.metadata.Table.getStorageHandler(Table.java:342)
	... 80 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.kudu.KuduStorageHandler
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(HiveUtils.java:298)
	... 81 more


In [None]:
#new_batch_spark_append_df.union(customers_df)