In [21]:
import pandas as pd
from google.cloud import bigquery
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [2]:
client = bigquery.Client()

# Customers data

In [50]:
customer_df = pd.read_csv("../data/customers.csv")

In [51]:
customer_df.head()

Unnamed: 0,email,customer_id,country
0,mblodxnyamqhdsvo@yahoo.com,4da8f229-d382-40e9-9638-4149ac20043a,JP
1,andhkxhbapsvjs@hotmail.com,5731aca0-8432-46fc-9d90-2727e6ead179,JP
2,xbmdggthhulure@outlook.com,af76472a-15ca-43c8-8a7d-bd4c85ba94aa,US
3,ogsyrodfyzmuxpekgv@gmail.com,c47932af-883b-4926-879a-656293a1c9cd,DE
4,dpvvimwrbvdcmh@aol.com,adeecb49-5e5c-41a0-abc0-3095bcbdfcb9,JP


In [52]:
customer_df.shape

(1000000, 3)

In [53]:
customer_df.columns

Index(['email', 'customer_id', 'country'], dtype='object')

In [54]:
customer_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Data columns (total 3 columns):
 #   Column       Non-Null Count    Dtype 
---  ------       --------------    ----- 
 0   email        1000000 non-null  object
 1   customer_id  1000000 non-null  object
 2   country      1000000 non-null  object
dtypes: object(3)
memory usage: 22.9+ MB


In [55]:
customer_df.describe()

Unnamed: 0,email,customer_id,country
count,1000000,1000000,1000000
unique,1000000,1000000,5
top,mblodxnyamqhdsvo@yahoo.com,4da8f229-d382-40e9-9638-4149ac20043a,FR
freq,1,1,201045


## Load customers data to BigQuery

In [30]:
job_config = bigquery.LoadJobConfig(
    clustering_fields=["country"],
    write_disposition="WRITE_TRUNCATE",
)

In [32]:
job = client.load_table_from_dataframe(
    customer_df, "datapipeline.raw_car.customers", job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

LoadJob<project=panalyt-datapipeline, location=asia-southeast1, id=225b1b2c-38e9-47f9-8d92-f4172e777ddf>

In [33]:
client.get_table("datapipeline.raw_car.customers").num_rows

1000000

# Hardware sales data

In [3]:
hardware_df = pd.read_excel("../data/hardware_sales.xlsx")

In [4]:
hardware_df.head()

Unnamed: 0,email,order_id,revenue,timestamp
0,ellcsqjyxbsoujwbp@outlook.com,EB192F6893DCBF76766D5310,19.99,1558831217
1,jeszbtnjpankz@icloud.com,46AE0481B392C9FF723483BD,39.99,1636228626
2,nxefbqahwuchvnzfdj@protonmail.com,318971F3FD4D0A84C2BB853B,49.99,1652954805
3,nlpgeqlhetla@aol.com,46B36B2797A614C2E09C35B9,49.99,1644129781
4,eizngzzeosylxslyfr@outlook.com,AC924E89C2FAF19F354B85E0,49.99,1669238899


In [5]:
hardware_df.shape

(358301, 4)

In [6]:
hardware_df.columns

Index(['email', 'order_id', 'revenue', 'timestamp'], dtype='object')

In [7]:
hardware_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 358301 entries, 0 to 358300
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   email      358301 non-null  object 
 1   order_id   358301 non-null  object 
 2   revenue    358301 non-null  float64
 3   timestamp  358301 non-null  int64  
dtypes: float64(1), int64(1), object(2)
memory usage: 10.9+ MB


In [8]:
hardware_df.describe()

Unnamed: 0,revenue,timestamp
count,358301.0,358301.0
mean,36.627715,1614266000.0
std,12.47796,39184520.0
min,19.99,1546298000.0
25%,19.99,1580331000.0
50%,39.99,1614262000.0
75%,49.99,1648212000.0
max,49.99,1682035000.0


In [9]:
hardware_df['timestamp'] = pd.to_datetime(hardware_df['timestamp'], unit='s')

In [10]:
hardware_df

Unnamed: 0,email,order_id,revenue,timestamp
0,ellcsqjyxbsoujwbp@outlook.com,EB192F6893DCBF76766D5310,19.99,2019-05-26 00:40:17
1,jeszbtnjpankz@icloud.com,46AE0481B392C9FF723483BD,39.99,2021-11-06 19:57:06
2,nxefbqahwuchvnzfdj@protonmail.com,318971F3FD4D0A84C2BB853B,49.99,2022-05-19 10:06:45
3,nlpgeqlhetla@aol.com,46B36B2797A614C2E09C35B9,49.99,2022-02-06 06:43:01
4,eizngzzeosylxslyfr@outlook.com,AC924E89C2FAF19F354B85E0,49.99,2022-11-23 21:28:19
...,...,...,...,...
358296,cdilhsempyzs@gmail.com,4D3DAC566F7D96B483DE8C71,39.99,2022-12-29 18:02:18
358297,tjtfwqhykwcyctffy@protonmail.com,49D720C9FF6A092188B566E8,19.99,2019-11-25 06:32:37
358298,mlrhpavilioofe@hotmail.com,F371D9FDA91D7C536A953437,19.99,2022-10-24 19:33:39
358299,uofmkdtjrqnvvpavr@gmail.com,6F9F24F63A5EA2E27D07C575,49.99,2019-10-20 11:47:51


In [11]:
hardware_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 358301 entries, 0 to 358300
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype         
---  ------     --------------   -----         
 0   email      358301 non-null  object        
 1   order_id   358301 non-null  object        
 2   revenue    358301 non-null  float64       
 3   timestamp  358301 non-null  datetime64[ns]
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 10.9+ MB


## Load Hardware sales data to BigQuery

In [33]:
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("timestamp", bigquery.enums.SqlTypeNames.TIMESTAMP),
#         bigquery.SchemaField("revenue", bigquery.enums.SqlTypeNames.NUMERIC),
    ],
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="timestamp",
    ),
    write_disposition="WRITE_TRUNCATE",
)

In [34]:
job = client.load_table_from_dataframe(
    hardware_df, "datapipeline.raw_car.hardware_sales", job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

LoadJob<project=panalyt-datapipeline, location=asia-southeast1, id=5fe65d19-175e-438d-a848-23702c0866c1>

In [35]:
client.get_table("datapipeline.raw_car.hardware_sales").num_rows

358301

# Subscription events data

In [38]:
subscription_df = pd.read_json("../data/subscription_events.json", lines=True)

In [39]:
subscription_df.head()

Unnamed: 0,event_type,order_id,timestamp,customer_id,revenue
0,subscription_cancelled,00001DC603F9FB448E64,2021-01-22 22:43:14,,
1,subscription_created,00001DC603F9FB448E64,2020-06-14 16:14:32,255fc076-74d7-400f-957f-62b4be673ffc,49.99
2,subscription_renewed,000036CF5E74CC075FA0,2021-09-14 20:29:40,,
3,subscription_created,000036CF5E74CC075FA0,2019-09-08 04:22:46,fc340fd7-36d7-41ba-bcb2-8adb0ace904d,36.99
4,subscription_renewed,000036CF5E74CC075FA0,2022-09-15 14:35:56,,


In [40]:
subscription_df.shape

(711502, 5)

In [42]:
subscription_df.columns

Index(['event_type', 'order_id', 'timestamp', 'customer_id', 'revenue'], dtype='object')

In [43]:
subscription_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 711502 entries, 0 to 711501
Data columns (total 5 columns):
 #   Column       Non-Null Count   Dtype         
---  ------       --------------   -----         
 0   event_type   711502 non-null  object        
 1   order_id     711502 non-null  object        
 2   timestamp    711502 non-null  datetime64[ns]
 3   customer_id  273004 non-null  object        
 4   revenue      273004 non-null  float64       
dtypes: datetime64[ns](1), float64(1), object(3)
memory usage: 27.1+ MB


In [45]:
subscription_df.describe()

Unnamed: 0,timestamp,revenue
count,711502,273004.0
mean,2021-09-05 10:44:50.628348928,28.009648
min,2018-12-31 23:01:45,9.99
25%,2020-11-12 18:26:01,14.99
50%,2021-11-07 23:31:49,25.99
75%,2022-08-19 20:40:31.249999872,36.99
max,2023-04-20 23:57:39,49.99
std,,12.552157


## Load subscription events data to BigQuery

In [46]:
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("timestamp", bigquery.enums.SqlTypeNames.TIMESTAMP),
    ],
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="timestamp",
    ),
    write_disposition="WRITE_TRUNCATE",
)

In [48]:
job = client.load_table_from_dataframe(
    subscription_df, "datapipeline.raw_car.subscription_events", job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

LoadJob<project=panalyt-datapipeline, location=asia-southeast1, id=e95a91ca-fa05-4866-b34e-d131d2b3c21e>

In [49]:
client.get_table("datapipeline.raw_car.subscription_events").num_rows

711502

# Spark

In [22]:
conf = SparkConf()
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", False)
conf.set("spark.sql.broadcastTimeout", "2400")

<pyspark.conf.SparkConf at 0x14ce01510>

In [23]:
spark = (
    SparkSession.builder.config(conf=conf)
    .appName("spark-jobs")
    .enableHiveSupport()
    .getOrCreate()
)

23/05/10 10:13:10 WARN Utils: Your hostname, MayM1.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
23/05/10 10:13:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/10 10:13:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [25]:
log4j = spark._jvm.org.apache.log4j
logger = log4j.LogManager.getLogger(__name__)

In [26]:
subscription_sdf = spark.read.json("../data/subscription_events.json")

                                                                                

In [30]:
subscription_sdf.show()

+--------------------+--------------------+--------------------+-------+----------+
|         customer_id|          event_type|            order_id|revenue| timestamp|
+--------------------+--------------------+--------------------+-------+----------+
|                null|subscription_canc...|00001DC603F9FB448E64|   null|1611355394|
|255fc076-74d7-400...|subscription_created|00001DC603F9FB448E64|  49.99|1592151272|
|                null|subscription_renewed|000036CF5E74CC075FA0|   null|1631651380|
|fc340fd7-36d7-41b...|subscription_created|000036CF5E74CC075FA0|  36.99|1567916566|
|                null|subscription_renewed|000036CF5E74CC075FA0|   null|1663252556|
|                null|subscription_renewed|000036CF5E74CC075FA0|   null|1599684839|
|                null|subscription_renewed|000068E0C2382E9D8C98|   null|1649159012|
|e6a51f87-00d6-485...|subscription_created|000068E0C2382E9D8C98|  49.99|1585666788|
|                null|subscription_renewed|000068E0C2382E9D8C98|   null|1681

In [31]:
subscription_sdf.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- timestamp: long (nullable = true)

