## Check System Version & Import Libraries

In [1]:
import sys
import os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
print(sys.executable)

/usr/local/opt/python@3.9/bin/python3.9


In [2]:
# Jupyer Notebook Theme: jt -t monokai -f fira -fs 10 -nf ptsans -nfs 11 -N -kl -cursw 2 -cursc r -cellw 95% -T
# Check python version
cdPyVer = 0x600
cdSysVer = sys.version[:5]
print("%s%s" %("My python version is: ", cdSysVer)) # strangely it works only on 3.7.4 and not 3.7.6
from tqdm.auto import tqdm

# Import libraries
import pandas as pd
import numpy as np
import warnings

import matplotlib.pyplot as plt
from matplotlib import style
plt.style.use('seaborn-dark') # use this theme as you are using dark theme in the notebook
import seaborn as sns

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
warnings.filterwarnings('ignore')

My python version is: 3.9.7


## Acquire Data

#### Ingest data from Bigquery using Pandas

In [3]:
# Filters in Query
sys.path.append('/Users/Shandeep/qc_data_insights/utils/') # git clone this repository qc_data_insights
import utils as u 
import os
from google.oauth2 import service_account
# pip install google-cloud-bigquery==1.24.0 # requires this version.. current version is bugged with pandas
from google.cloud import bigquery

google_credentials_path = os.path.expanduser('~')+'/Documents/google_cloud_data-insights-team.json' # make sure add json file credentials in documents folder
credentials = service_account.Credentials.from_service_account_file(google_credentials_path)
bqclient = bigquery.Client(credentials=credentials, project=credentials.project_id)
print(f'connected to BigQuery, project: {bqclient.project} on version {bigquery.__version__}')

connected to BigQuery, project: dh-darkstores-live on version 1.24.0


In [29]:
print("Which global_entity_id do you want to filter for?")
global_entity_id = 'FP_SG' # press 'shift' and 'enter' on cell to re-enter global_entity_id


# Write query 
query =f"""
SELECT global_entity_id
,   DATE(order_placed_localtime_at) AS order_date
,   order_id
,   analytical_customer_id AS customer_id
,   items.sku
,   items.product.product_name AS product
,   items.qty_sold
,   items.value_euro.unit_price_paid_eur AS unit_price
FROM `fulfillment-dwh-production.cl_dmart.customer_orders`, UNNEST(items) AS items
WHERE DATE(order_placed_localtime_at) < CURRENT_DATE()
AND global_entity_id = '{global_entity_id}'
AND is_dmart 
AND is_failed = FALSE 
AND is_cancelled = FALSE
AND analytical_customer_id IS NOT NULL 
AND order_id IS NOT NULL
AND DATE_TRUNC(order_placed_localtime_at, MONTH) > DATE_SUB(CURRENT_DATE(), INTERVAL 1 MONTH)
AND DATE_TRUNC(order_placed_localtime_at, MONTH) <= CURRENT_DATE()
"""

# Read query and display in the form of a pandas dataframe
df = u.read_bigquery(query, bqclient, parse_dates=[], verbose=True)
df.head()

Which global_entity_id do you want to filter for?
running query... job done, downloading... 

Downloading:   0%|          | 0/1202034 [00:00<?, ?rows/s]

done with shape (1202034, 8)


Unnamed: 0,global_entity_id,order_date,order_id,customer_id,sku,product,qty_sold,unit_price
0,FP_SG,2021-10-13,n7no-etyp,dpiF5eG6VfqQtWyFx5XjDg,5654Q1,Suntory Strong Zero 9% Alcohol (Peach) | 350 ml,6.0,3.834804
1,FP_SG,2021-10-13,n7no-x9ce,8lV2KegXXrKqNDY7OST32A,CM0WJ7,Anchor Strong 490ml,6.0,2.556536
2,FP_SG,2021-10-13,n7no-vd9i,aoGa6NVvVqC4XqDcMtGcjQ,L4MGB1,Kingfisher Premium Extra Strong Lager Beer Can...,5.0,2.556536
3,FP_SG,2021-10-13,n7no-p9bj,znWNHJ85X-ud7g8B0oS2WQ,L4MGB1,Kingfisher Premium Extra Strong Lager Beer Can...,5.0,2.556536
4,FP_SG,2021-10-13,n7no-erqs,KRk2eqMAUMya62_W04kleA,P6P5B6,Dasani 1.5L,12.0,0.575221


#### Pandas dataframe to Spark dataframe

### Using Spark in Python
---
The first step in using Spark is connecting to a cluster.

In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called worker. The master sends the workers data and calculations to run, and they send their results back to the master.


Creating the connection is as simple as creating an instance of the SparkContext class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

An object holding all these attributes can be created with the SparkConf() constructor. Take a look at the documentation for all the details!

In [40]:
# from  pyspark library import 
# SparkSession
from pyspark.sql import SparkSession
  
# Building the SparkSession and name
# it :'pandas to spark'
spark = SparkSession.builder.appName(
  "pandas to spark").getOrCreate()

# Verify Spark Context
print(spark)
print()
# Print Spark version
print(spark.version)
  
# create DataFrame
df = spark.createDataFrame(df)
  
df.show(2)

<pyspark.sql.session.SparkSession object at 0x12e881af0>

3.1.2
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+
|global_entity_id|order_date| order_id|         customer_id|   sku|             product|            qty_sold|        unit_price|
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+
|           FP_SG|2021-10-13|n7no-etyp|dpiF5eG6VfqQtWyFx...|5654Q1|Suntory Strong Ze...|6.000000000000000000|3.8348043003495427|
|           FP_SG|2021-10-13|n7no-x9ce|8lV2KegXXrKqNDY7O...|CM0WJ7| Anchor Strong 490ml|6.000000000000000000|2.5565362002330283|
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+
only showing top 2 rows



21/10/19 18:22:01 WARN TaskSetManager: Stage 38 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.


In [41]:
print("Total Records in dataset", df.count())

21/10/19 18:22:01 WARN TaskSetManager: Stage 39 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.
[Stage 39:>                                                         (0 + 4) / 4]

Total Records in dataset 1202034


                                                                                

In [42]:
print('Data overview')
df.printSchema()
print('Columns overview')
pd.DataFrame(df.dtypes, columns = ['Column Name','Data type'])

Data overview
root
 |-- global_entity_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- sku: string (nullable = true)
 |-- product: string (nullable = true)
 |-- qty_sold: decimal(38,18) (nullable = true)
 |-- unit_price: double (nullable = true)

Columns overview


Unnamed: 0,Column Name,Data type
0,global_entity_id,string
1,order_date,date
2,order_id,string
3,customer_id,string
4,sku,string
5,product,string
6,qty_sold,"decimal(38,18)"
7,unit_price,double


Put some Spark in your data
In the last exercise, you saw how to move data from Spark to pandas. However, maybe you want to go the other direction, and put a pandas DataFrame into a Spark cluster! The SparkSession class has a method for this as well.

The `.createDataFrame()` method takes a pandas DataFrame and returns a Spark DataFrame.

The output of this method is stored locally, not in the SparkSession catalog. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.

For example, a SQL query (using the `.sql()` method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.

You can do this using the `.createTempView()` Spark DataFrame method, which takes as its only argument the name of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific SparkSession used to create the Spark DataFrame.

There is also the method .createOrReplaceTempView(). This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You'll use this method to avoid running into problems with duplicate tables.

In [43]:
# Examine tables in the catalog
print(spark.catalog.listTables())

# Add df_spark to the catalog
df.createOrReplaceTempView("df")
print(spark.catalog.listTables())

[Table(name='df_spark', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='df', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='df_spark', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


### Spark SQL

In [44]:
spark.sql("SELECT order_id FROM df WHERE qty_sold > 5").show(10)

+---------+
| order_id|
+---------+
|n7no-etyp|
|n7no-x9ce|
|n7no-erqs|
|n7no-exnx|
|n7no-tr7x|
|n7no-ya8d|
|n7no-qn3q|
|n7no-jc94|
|n7no-mmek|
|n7no-mmek|
+---------+
only showing top 10 rows



21/10/19 18:22:03 WARN TaskSetManager: Stage 46 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.


In [46]:
# Create column called "total sum"
df = df.withColumn("total_sum", df.qty_sold * df.unit_price)
df.show(2)

+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+------------------+
|global_entity_id|order_date| order_id|         customer_id|   sku|             product|            qty_sold|        unit_price|         total_sum|
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+------------------+
|           FP_SG|2021-10-13|n7no-etyp|dpiF5eG6VfqQtWyFx...|5654Q1|Suntory Strong Ze...|6.000000000000000000|3.8348043003495427|23.008825802097256|
|           FP_SG|2021-10-13|n7no-x9ce|8lV2KegXXrKqNDY7O...|CM0WJ7| Anchor Strong 490ml|6.000000000000000000|2.5565362002330283| 15.33921720139817|
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+------------------+
only showing top 2 rows



21/10/19 18:22:33 WARN TaskSetManager: Stage 47 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.


In [47]:
# Filter orders with qty_sold > 5
df.filter("qty_sold > 5").show(10)

+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+------------------+
|global_entity_id|order_date| order_id|         customer_id|   sku|             product|            qty_sold|        unit_price|         total_sum|
+----------------+----------+---------+--------------------+------+--------------------+--------------------+------------------+------------------+
|           FP_SG|2021-10-13|n7no-etyp|dpiF5eG6VfqQtWyFx...|5654Q1|Suntory Strong Ze...|6.000000000000000000|3.8348043003495427|23.008825802097256|
|           FP_SG|2021-10-13|n7no-x9ce|8lV2KegXXrKqNDY7O...|CM0WJ7| Anchor Strong 490ml|6.000000000000000000|2.5565362002330283| 15.33921720139817|
|           FP_SG|2021-10-13|n7no-erqs|KRk2eqMAUMya62_W0...|P6P5B6|         Dasani 1.5L|12.00000000000000...|0.5752206450524314|6.9026477406291775|
|           FP_SG|2021-10-13|n7no-exnx|p-9Fu7viXzGXAB0iM...|L4MGB1|Kingfisher Premiu...|6.000000000000000000|2.5

21/10/19 18:22:37 WARN TaskSetManager: Stage 48 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.


In [48]:
df.select("global_entity_id","customer_id").show(2)

+----------------+--------------------+
|global_entity_id|         customer_id|
+----------------+--------------------+
|           FP_SG|dpiF5eG6VfqQtWyFx...|
|           FP_SG|8lV2KegXXrKqNDY7O...|
+----------------+--------------------+
only showing top 2 rows



21/10/19 18:22:40 WARN TaskSetManager: Stage 49 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.


In [50]:
# Find the min quantity sold on 8th august
df.filter(df.order_date == '2021-10-08').groupBy().min("qty_sold").show()

21/10/19 18:22:46 WARN TaskSetManager: Stage 50 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.
[Stage 50:>                                                         (0 + 4) / 4]

+-------------+
|min(qty_sold)|
+-------------+
|        0E-18|
+-------------+





In [51]:
# Total sum for order_date 08-10
df.filter(df.order_date == '2021-10-08').groupBy().sum("total_sum").show()

21/10/19 18:22:52 WARN TaskSetManager: Stage 52 contains a task of very large size (25702 KiB). The maximum recommended task size is 1000 KiB.
[Stage 52:>                                                         (0 + 4) / 4]

+--------------+
|sum(total_sum)|
+--------------+
|           NaN|
+--------------+



                                                                                