## Import modules

In [8]:
import findspark

findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd

In [9]:
spark = SparkSession.builder.appName("SSMS to PySpark").getOrCreate()

## Connecting with SSMS

In [10]:
def connect_ssms():
    conn = pyodbc.connect("Driver={SQL Server Native Client 11.0};"
                          "Server=105857-001L\SQLEXPRESS;"
                          "Database=Retail;"
                          "Trusted_Connection=yes;")
    return conn

def customers(conn):
    query = "SELECT * FROM customers"
    df_pandas_customers = pd.read_sql(query, conn)
    df_customers = spark.createDataFrame(df_pandas_customers)
    return df_customers

def order_items(conn):
    query = "SELECT * FROM order_items"
    df_pandas_order_items = pd.read_sql(query, conn)
    df_order_items = spark.createDataFrame(df_pandas_order_items)
    return df_order_items

def orders(conn):
    query = "SELECT * FROM orders"
    df_pandas_orders = pd.read_sql(query, conn)
    df_orders = spark.createDataFrame(df_pandas_orders)
    return df_orders

In [11]:
conn = connect_ssms()

In [12]:
df_customers = customers(conn)
df_order_items = order_items(conn)
df_orders = orders(conn)

  df_pandas_customers = pd.read_sql(query, conn)
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  df_pandas_order_items = pd.read_sql(query, conn)
  df_pandas_orders = pd.read_sql(query, conn)


In [15]:
df_customers.limit(5).show(truncate=False)

+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street        |customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+-----------------------+-------------+--------------+----------------+
|1          |Richard       |Hernandez     |XXXXXXXXX     |XXXXXXXXX        |6303 Heather Plaza     |Brownsville  |TX            |78521           |
|2          |Mary          |Barrett       |XXXXXXXXX     |XXXXXXXXX        |9526 Noble Embers Ridge|Littleton    |CO            |80126           |
|3          |Ann           |Smith         |XXXXXXXXX     |XXXXXXXXX        |3422 Blue Pioneer Bend |Caguas       |PR            |725             |
|4          |Mary          |Jones         |XXXXXXXXX     |XXXXXXXXX        |8324 Little Common     |San Marcos   |CA  

In [22]:
# Number of rows
num_rows = df_customers.count()
print("Number of rows:", num_rows)

# Number of columns
num_columns = len(df_customers.columns)
print("Number of columns:", num_columns)

Number of rows: 12435
Number of columns: 9


In [16]:
df_order_items.limit(5).show(truncate=False)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|28087        |11236              |1014                 |2                  |99.96              |49.98                   |
|28088        |11237              |627                  |3                  |119.97             |39.99                   |
|28089        |11238              |502                  |5                  |250.0              |50.0                    |
|28090        |11239              |502                  |3                  |150.0              |50.0                    |
|28091        |11241              |1014                 |3                  |149.94             |49.98                   |
+-------------+-

In [23]:
# Number of rows
num_rows = df_order_items.count()
print("Number of rows:", num_rows)

# Number of columns
num_columns = len(df_order_items.columns)
print("Number of columns:", num_columns)

Number of rows: 172198
Number of columns: 6


In [17]:
df_orders.limit(5).show(truncate=False)

+--------+---------------------+-----------------+---------------+
|order_id|order_date           |order_customer_id|order_status   |
+--------+---------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00.0|11599            |CLOSED         |
|2       |2013-07-25 00:00:00.0|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00.0|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00.0|8827             |CLOSED         |
|5       |2013-07-25 00:00:00.0|11318            |COMPLETE       |
+--------+---------------------+-----------------+---------------+



In [24]:
# Number of rows
num_rows = df_orders.count()
print("Number of rows:", num_rows)

# Number of columns
num_columns = len(df_orders.columns)
print("Number of columns:", num_columns)

Number of rows: 68883
Number of columns: 4


## Inner join between customers and orders table

In [26]:
customers_orders_df = df_customers.join(df_orders, on=df_customers['customer_id'] == df_orders['order_customer_id'])

In [27]:
customers_orders_df.limit(5).show(truncate=False)

+-----------+--------------+--------------+--------------+-----------------+---------------------------+-------------+--------------+----------------+--------+---------------------+-----------------+--------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street            |customer_city|customer_state|customer_zipcode|order_id|order_date           |order_customer_id|order_status  |
+-----------+--------------+--------------+--------------+-----------------+---------------------------+-------------+--------------+----------------+--------+---------------------+-----------------+--------------+
|9993       |Mary          |Scott         |XXXXXXXXX     |XXXXXXXXX        |4148 Round Parkway         |Broken Arrow |OK            |74012           |154     |2013-07-26 00:00:00.0|9993             |ON_HOLD       |
|5925       |Megan         |Bennett       |XXXXXXXXX     |XXXXXXXXX        |4411 Cinder Front          |Hempstead    |NY            |11550  

In [28]:
# Number of rows
num_rows = customers_orders_df.count()
print("Number of rows:", num_rows)

# Number of columns
num_columns = len(customers_orders_df.columns)
print("Number of columns:", num_columns)

Number of rows: 68883
Number of columns: 13


In [29]:
customers_orders_df.selectExpr("customer_id", "order_id", "order_date", "order_status").orderBy('customer_id').show(10)

+-----------+--------+--------------------+---------------+
|customer_id|order_id|          order_date|   order_status|
+-----------+--------+--------------------+---------------+
|          1|   22945|2013-12-13 00:00:...|       COMPLETE|
|         10|   45239|2014-05-01 00:00:...|       COMPLETE|
|         10|   56133|2014-07-15 00:00:...|       COMPLETE|
|        100|    6641|2013-09-05 00:00:...|       COMPLETE|
|        100|   15045|2013-10-28 00:00:...|     PROCESSING|
|        100|   22395|2013-12-09 00:00:...|       CANCELED|
|        100|   28477|2014-01-16 00:00:...|       COMPLETE|
|        100|   54995|2014-07-08 00:00:...|       COMPLETE|
|        100|   62907|2014-02-06 00:00:...|PENDING_PAYMENT|
|        100|   64426|2014-04-06 00:00:...|        PENDING|
+-----------+--------+--------------------+---------------+
only showing top 10 rows



In [30]:
from pyspark.sql.functions import struct

In [32]:
customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id').show(10)

+-----------+--------------------+
|customer_id|       order_details|
+-----------+--------------------+
|          1|{22945, 2013-12-1...|
|         10|{45239, 2014-05-0...|
|         10|{56133, 2014-07-1...|
|        100|{6641, 2013-09-05...|
|        100|{15045, 2013-10-2...|
|        100|{22395, 2013-12-0...|
|        100|{28477, 2014-01-1...|
|        100|{54995, 2014-07-0...|
|        100|{62907, 2014-02-0...|
|        100|{64426, 2014-04-0...|
+-----------+--------------------+
only showing top 10 rows



In [33]:
customer_order_struct=customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details'))

In [34]:
from pyspark.sql.functions import collect_list

In [35]:
df_final = customer_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [37]:
df_final.show(1, truncate=False)

+-----------+------------------------------------------+
|customer_id|order_details                             |
+-----------+------------------------------------------+
|1          |[{22945, 2013-12-13 00:00:00.0, COMPLETE}]|
+-----------+------------------------------------------+
only showing top 1 row



In [None]:
#df_final.coalesce(1).write.json('final')

## Denormalization for all 3 tables

In [44]:
# joining the tables
customer_full_details = df_customers.\
join(df_orders, df_customers['customer_id']==df_orders['order_customer_id']).\
join(df_order_items, df_orders['order_id']==df_order_items['order_item_order_id'])

In [45]:
customer_full_details.show(3, truncate=False)

+-----------+--------------+--------------+--------------+-----------------+-----------------+-------------+--------------+----------------+--------+---------------------+-----------------+------------+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street  |customer_city|customer_state|customer_zipcode|order_id|order_date           |order_customer_id|order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-----------+--------------+--------------+--------------+-----------------+-----------------+-------------+--------------+----------------+--------+---------------------+-----------------+------------+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|3872       |Lisa          |

In [50]:
denorm_df=customer_full_details. \
select('customer_id','customer_fname','customer_lname','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')). \
groupBy('customer_id','customer_fname','customer_lname','order_id','order_date','order_status'). \
agg(collect_list('order_item_details').alias('order_item_details')). \
orderBy('customer_id'). \
select('customer_id','customer_fname','customer_lname',struct('order_id','order_date','order_status','order_item_details').alias('order_details')). \
groupBy('customer_id','customer_fname','customer_lname'). \
agg(collect_list('order_details').alias('order_details')). \
orderBy('customer_id')

In [51]:
denorm_df.show(3)

+-----------+--------------+--------------+--------------------+
|customer_id|customer_fname|customer_lname|       order_details|
+-----------+--------------+--------------+--------------------+
|          1|       Richard|     Hernandez|[{22945, 2013-12-...|
|         10|       Melissa|         Smith|[{45239, 2014-05-...|
|        100|        George|       Barrett|[{54995, 2014-07-...|
+-----------+--------------+--------------+--------------------+
only showing top 3 rows

