In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=7782ab3890fb144f06ebccb2ce80644285226f8f19e626d13d857eeff186d4ac
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# Import necessary PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ETL for orders information and customer information") \
    .getOrCreate()

In [None]:
# Step 1: Load Data

customer_df = spark.read.csv("/content/drive/MyDrive/Advanced Data Engineering/olist_customers_dataset 2.csv", header=True, inferSchema=True)
order_df = spark.read.csv("/content/drive/MyDrive/Advanced Data Engineering/olist_orders_dataset 2.csv", header=True, inferSchema=True)

In [None]:
print("All Customers:")
customer_df.show()

All Customers:
+--------------------+--------------------+------------------------+--------------------+--------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|
+--------------------+--------------------+------------------------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|                    8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|                   13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|                   89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c.

In [None]:
print("All Orders:")
order_df.show()

All Orders:
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b

In [None]:
dup_customer = customer_df.groupBy("customer_id").count().filter("count > 1")
print("Duplicate Customers:")
dup_customer.show()

Duplicate Customers:
+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [None]:
#identify dupicate order
dup_order = order_df.groupBy("order_id").count().filter("count > 1")
print("Duplicate Orders:")
dup_order.show()

Duplicate Orders:
+--------+-----+
|order_id|count|
+--------+-----+
+--------+-----+



In [None]:
# Remove duplicates
order_df = order_df.dropDuplicates()
customer_df = customer_df.dropDuplicates()

In [None]:
from pyspark.sql.functions import col, sum

In [None]:
# Handle missing values
# missing value count
missing_value_counts = order_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in order_df.columns])
missing_value_counts.show()


+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|order_id|customer_id|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+
|       0|          0|           0|                       0|              160|                        1783|                         2965|                            0|
+--------+-----------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+



In [None]:
missing_value_counts2 = customer_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in customer_df.columns])
missing_value_counts2.show()

+-----------+------------------+------------------------+-------------+--------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|
+-----------+------------------+------------------------+-------------+--------------+
|          0|                 0|                       0|            0|             0|
+-----------+------------------+------------------------+-------------+--------------+



In [None]:
#drop order_df missing value
order_df_cleaned = order_df.na.drop()
order_df_cleaned.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|acce194856392f074...|7e20bf5ca92da6820...|   delivered|     2018-06-04 00:00:13|2018-06-05 00:35:10|         2018-06-05 13:24:00|          2018-06-16 15:20:55|          2018-07-18 00:00:00|
|1d067305b599c1e0d...|0489975a325480c9e...|   delivered|     2018-02-14 13:05:17|2018-02-14 13:15:38|         2018-02-20 20:12:57|          2018-03-09 21:52:36|          2018-03-09 00:00:00|
|6f841dde94727854e...|a9c9532060c9d245f...|  

In [None]:
# Drop missing values in customer_df (for consistency)
customer_df_cleaned = customer_df.na.drop()

In [None]:
# transforamation : Join two tables
joined_df = order_df_cleaned.join(customer_df, on="customer_id", how="inner")
joined_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------+--------------+
|         customer_id|            order_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|  customer_unique_id|customer_zip_code_prefix| customer_city|customer_state|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+--------------------+------------------------+--------------+--------------+
|7e20bf5ca92da6820...|acce194856392f074...|   delivered|     2018-06-04 00:00:13|2018-06-05 00:35:10|         2018-06-05 13:24:00|          2018-06-16 15:20:55|          2018-07-18 00:00:00|576ea0cab

In [None]:
# Stop Spark session
spark.stop()

In [None]:
###Part B task 2

!pip install pymongo

Collecting pymongo
  Downloading pymongo-4.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.6.1-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.6.1-py3-none-any.whl (307 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m307.7/307.7 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.8.0


In [None]:
from pymongo import MongoClient
import pandas as pd

In [None]:
client = MongoClient("mongodb+srv://wanlinzse:83373306aA@cluster0.kntbyh4.mongodb.net/")

# create database and collection
db = client.GDDA707_db
collection = db.assessment2_collection

In [None]:
# upload file from local
from google.colab import files
uploaded = files.upload()


Saving sales_transactions (1).csv to sales_transactions (1).csv


In [None]:
# read file
df = pd.read_csv('sales_transactions (1).csv')

In [None]:
# insert dataframe to mongo db
collection.insert_many(df.to_dict('records'))

InsertManyResult([ObjectId('66b43b92537df2f3d8981730'), ObjectId('66b43b92537df2f3d8981731'), ObjectId('66b43b92537df2f3d8981732'), ObjectId('66b43b92537df2f3d8981733'), ObjectId('66b43b92537df2f3d8981734'), ObjectId('66b43b92537df2f3d8981735'), ObjectId('66b43b92537df2f3d8981736'), ObjectId('66b43b92537df2f3d8981737'), ObjectId('66b43b92537df2f3d8981738'), ObjectId('66b43b92537df2f3d8981739'), ObjectId('66b43b92537df2f3d898173a'), ObjectId('66b43b92537df2f3d898173b'), ObjectId('66b43b92537df2f3d898173c'), ObjectId('66b43b92537df2f3d898173d'), ObjectId('66b43b92537df2f3d898173e'), ObjectId('66b43b92537df2f3d898173f'), ObjectId('66b43b92537df2f3d8981740'), ObjectId('66b43b92537df2f3d8981741'), ObjectId('66b43b92537df2f3d8981742'), ObjectId('66b43b92537df2f3d8981743'), ObjectId('66b43b92537df2f3d8981744'), ObjectId('66b43b92537df2f3d8981745'), ObjectId('66b43b92537df2f3d8981746'), ObjectId('66b43b92537df2f3d8981747'), ObjectId('66b43b92537df2f3d8981748'), ObjectId('66b43b92537df2f3d89817

In [None]:
# query the order amount over 200
results = collection.find({"amount": {"$gt": 200}})

# print result
for result in results:
    print(result)

{'_id': ObjectId('66b43b92537df2f3d8981730'), 'transaction_id': 1, 'customer_id': 102, 'product_id': 'P1', 'timestamp': '2023-08-13 21:21:00', 'amount': 479.68}
{'_id': ObjectId('66b43b92537df2f3d8981732'), 'transaction_id': 3, 'customer_id': 101, 'product_id': 'P3', 'timestamp': '2023-08-25 10:24:00', 'amount': 249.55}
{'_id': ObjectId('66b43b92537df2f3d8981735'), 'transaction_id': 6, 'customer_id': 102, 'product_id': 'P6', 'timestamp': '2023-06-27 01:55:00', 'amount': 303.38}
{'_id': ObjectId('66b43b92537df2f3d8981737'), 'transaction_id': 8, 'customer_id': 103, 'product_id': 'P8', 'timestamp': '2023-02-19 23:17:00', 'amount': 355.26}
{'_id': ObjectId('66b43b92537df2f3d8981739'), 'transaction_id': 10, 'customer_id': 101, 'product_id': 'P10', 'timestamp': '2023-08-06 19:51:00', 'amount': 336.65}
{'_id': ObjectId('66b43b92537df2f3d898173c'), 'transaction_id': 13, 'customer_id': 103, 'product_id': 'P13', 'timestamp': '2023-08-19 07:35:00', 'amount': 395.69}
{'_id': ObjectId('66b43b92537d