## Run docker dependencies

## Make sure spark notebook is present in docker

In [1]:
# docker run -it -p 8888:8888 -p 4040:4040 --name my_jupyter -v "/:/root" jupyter/pyspark-notebook

# # For coming back to saved work, RUN
# docker start -ai my_jupyter

## make sure docker is running with mongo and postgres

In [2]:
# Make sure mongodb and postgres are running in docker for the code below to work
# docker run -d --name mongodb -p 27017:27017 mongo
# docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=admin -e POSTGRES_PASSWORD=password -e POSTGRES_DB=retail_db postgres

In [3]:
# # Start Jupyter with persistent storage, mount everything at root
## THIS NEEDS TO BE TEST
# docker run -it --rm -p 8888:8888 -p 4040:4040  \
#   -v "/:/root" \
#   jupyter/pyspark-notebook


# # # Start MongoDB & PostgreSQL with persistent storage:
# docker run -d --name mongodb -p 27017:27017 \
#   -v "/mongo_data:/data/db" mongo

# docker run -d --name postgres -p 5432:5432 \
#   -e POSTGRES_USER=admin -e POSTGRES_PASSWORD=password -e POSTGRES_DB=retail_db \
#   -v "/pg_data:/var/lib/postgresql/data" postgres


## install the required dependencies

In [4]:
!pip install pandas pyspark pymongo psycopg2-binary

Collecting pymongo
  Downloading pymongo-4.11.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (22 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (4.9 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m219.3 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading pymongo-4.11.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m153.9 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_aarch

## install postgres

In [5]:
# click File-> New -> Terminal
# Run the following in the terminal inside this environment
# wget -P ./ https://jdbc.postgresql.org/download/postgresql-42.5.0.jar

# # TRY THIS NEXT RUN
# !curl -o ./postgresql-42.5.0.jar https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
#  OR
!wget -P ./ https://jdbc.postgresql.org/download/postgresql-42.5.0.jar


--2025-02-14 22:11:42--  https://jdbc.postgresql.org/download/postgresql-42.5.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1046274 (1022K) [application/java-archive]
Saving to: ‘./postgresql-42.5.0.jar’


2025-02-14 22:12:07 (41.5 KB/s) - ‘./postgresql-42.5.0.jar’ saved [1046274/1046274]



# MAIN CODE

In [6]:
import pandas as pd
from pymongo import MongoClient
import psycopg2
from pyspark.sql import SparkSession

# Initialize Spark Session inside Docker with PostgreSQL JDBC driver
# Initialize Spark with proper JDBC driver
spark = SparkSession.builder \
    .appName("RetailAnalysis") \
    .config("spark.driver.extraClassPath", "postgresql-42.5.0.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.5.0.jar") \
    .getOrCreate()
print("Spark Version:", spark.version)

# Load data locally
df = pd.read_csv("marketing_campaign_dataset_head.csv").head(15) # make sure you have this file at the root
print("Columns in dataset:", df.columns)

# 1. Connect to MongoDB (Ensure MongoDB is running in a container)
MONGO_URI = "mongodb://host.docker.internal:27017/"  # Use this inside Docker
client = MongoClient(MONGO_URI)
db = client["retail_db"]
collection = db["marketing_campaign"]

# Clear existing data to avoid duplicates
collection.delete_many({})
print("Old data cleared from MongoDB.")

# Convert DataFrame to dictionary records and insert into MongoDB
collection.insert_many(df.to_dict(orient="records"))
print("Data successfully inserted into MongoDB!")

# 2. Define PostgreSQL Schema
POSTGRES_URI = "jdbc:postgresql://host.docker.internal:5432/retail_db"
postgres_conn = psycopg2.connect(
    dbname="retail_db",
    user="admin",
    password="password",
    host="host.docker.internal",
    port=5432  # Ensure correct port
)
cursor = postgres_conn.cursor()

cursor.execute(
    """
    CREATE TABLE IF NOT EXISTS marketing_campaign (
        id SERIAL PRIMARY KEY,
        customer_id INT,
        amount_spent DECIMAL DEFAULT 0,
        rating DECIMAL DEFAULT 0,
        purchase_date DATE
    );
    """
)
postgres_conn.commit()

# 3. Transfer Data from MongoDB to PostgreSQL
records = collection.find({}, {"_id": 0})
for record in records:
    cursor.execute(
        """
        INSERT INTO marketing_campaign (customer_id, amount_spent, rating, purchase_date)
        VALUES (%s, %s, %s, %s)
        """,
        (
            record.get("Campaign_ID", None),  # Adjusted based on dataset
            record.get("Amount", 0),  # Default to 0 if missing
            record.get("Rating_Score", 0),  # Default to 0 if missing
            record.get("Date", None)  # Keep null if missing
        ),
    )

postgres_conn.commit()
cursor.close()
postgres_conn.close()
print("Data successfully transferred to PostgreSQL!")

# 4. Spark SQL Analysis
df_spark = (
    spark.read.format("jdbc")
    .option("url", POSTGRES_URI)
    .option("dbtable", "marketing_campaign")
    .option("user", "admin")
    .option("password", "password")
    .load()
)

df_spark.createOrReplaceTempView("marketing_campaign")

# Example queries
spark.sql(
    """
    SELECT customer_id, COUNT(*) AS purchases
    FROM marketing_campaign
    GROUP BY customer_id
    ORDER BY purchases DESC
    """
).show()

spark.sql(
    """
    SELECT customer_id, SUM(amount_spent) AS total_spent
    FROM marketing_campaign
    GROUP BY customer_id
    ORDER BY total_spent DESC
    LIMIT 5
    """
).show()

# Convert purchase_date to proper Date format if necessary
df_spark = df_spark.withColumn("purchase_date", df_spark["purchase_date"].cast("date"))

# Export to Parquet
df_spark.write.mode("overwrite").parquet("marketing_campaign.parquet")
print("Data exported to Parquet!")


Spark Version: 3.5.0
Columns in dataset: Index(['Campaign_ID', 'Company', 'Campaign_Type', 'Target_Audience',
       'Duration', 'Channel_Used', 'Conversion_Rate', 'Acquisition_Cost',
       'ROI', 'Location', 'Language', 'Clicks', 'Impressions',
       'Engagement_Score', 'Customer_Segment', 'Date'],
      dtype='object')
Old data cleared from MongoDB.
Data successfully inserted into MongoDB!
Data successfully transferred to PostgreSQL!
+-----------+---------+
|customer_id|purchases|
+-----------+---------+
|         12|        5|
|          1|        5|
|         13|        5|
|          6|        5|
|          3|        5|
|          5|        5|
|         15|        5|
|          9|        5|
|          4|        5|
|          8|        5|
|          7|        5|
|         10|        5|
|         11|        5|
|         14|        5|
|          2|        5|
+-----------+---------+

+-----------+--------------------+
|customer_id|         total_spent|
+-----------+------------------