### Install required packages

In [1]:
import os

#Install mariadb-connector-C before proceeding here.

os.environ["PYSPARK_HADOOP_VERSION"] = "3"

!pip install pyspark==3.5.1
!pip install findspark==2.0.1
!pip install pandas==2.2.2
!pip install mariadb==1.1.10
!pip install kafka-python==2.0.2
!pip install redis==5.0.7

#In
#Set the following environment variables
# PYSPARK_PYTHON="python"

#JAVA_HOME to Java 17
#Add JAVA_HOME to PATH

#For Windows only
#  1. Copy the hadoop folder under "exercise-files" to c:\hadoop
#  2. Add an environment variable HADOOP_HOME=c:\hadoop
#  3. Add c:\hadoop\bin to PATH
#  4. Close the notebook and the command prompt. Reopen command prompt, restart notebook.


Collecting pyspark==3.5.1
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.5.1)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.1
Collecting findspark==2.0.1
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pandas==2.2.2
  Using cached pandas-2.2.2-cp311-cp311-macosx_11_0_arm64.whl.metadata (19 kB)
Collecting numpy>=1.23.2 (from pandas==2.2.2)
  Using cached numpy-2.0.1-cp311-cp311-macosx_14_0_arm64.whl.metadata (60 kB)
Collecting tzdata>=2022.7 (from pandas==2.2.2)
  Using cached tzdata-2024.1-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached pandas-2.2.2-cp311-cp311-macosx_11_0_arm64.whl (11.3 MB)
Us

### Setup MariaDB

In [5]:
import mariadb

#Connect to mariadb as root user
root_conn = mariadb.connect(
                user="root",
                password="spark",
                host="127.0.0.1",
                port=3306,
                database="mysql"
            )

db_cursor = root_conn.cursor()
db_cursor.execute("SHOW DATABASES")
db_databases = db_cursor.fetchall()

db_list=[]
for database in db_databases:
    db_list.append(database[0])
    
print("Databases available :", db_list)

#Create warehouse_stock database & item_stock table
if ( "warehouse_stock" in db_list):
    print("warehouse_stock DB already exists.")
else:
    print("Going to create warehouse_stock DB")
    #Create DB
    db_cursor.execute("CREATE DATABASE warehouse_stock")
    #Create table
    db_cursor.execute("""
        CREATE TABLE `warehouse_stock`.`item_stock` (
               `ID` INT NULL AUTO_INCREMENT,
               `STOCK_DATE` DATETIME NOT NULL, 
               `WAREHOUSE_ID` VARCHAR(45) NOT NULL,
               `ITEM_NAME` VARCHAR(45) NOT NULL,
               `OPENING_STOCK` INT NOT NULL DEFAULT 0,
               `RECEIPTS` INT  NOT NULL DEFAULT 0,
               `ISSUES` INT  NOT NULL DEFAULT 0,
               `UNIT_VALUE` DECIMAL(10,2)  NOT NULL DEFAULT 0,
               PRIMARY KEY (`ID`),
               INDEX `STOCK_DATE` (`STOCK_DATE` ASC));""")
    
    #Grant privileges to user spark
    db_cursor.execute("GRANT ALL PRIVILEGES ON warehouse_stock.* to 'spark'@'%'")
    db_cursor.execute("FLUSH PRIVILEGES")
    print("warehouse_stock DB successfully created")

#Create global_stock database & item_stock table
if ( "global_stock" in db_list):
    print("global_stock DB already exists.")
else:
    print("Going to create global_stock DB")
    #Create DB
    db_cursor.execute("CREATE DATABASE global_stock")
    #Create table
    db_cursor.execute("""
        CREATE TABLE `global_stock`.`item_stock` (
                `ID` INT NULL AUTO_INCREMENT,
                `STOCK_DATE` DATETIME NOT NULL,
                `ITEM_NAME` VARCHAR(45) NOT NULL,
                `TOTAL_REC` INT NOT NULL DEFAULT 0,
                `OPENING_STOCK` INT NOT NULL DEFAULT 0,
                `RECEIPTS` INT  NOT NULL DEFAULT 0,
                `ISSUES` INT  NOT NULL DEFAULT 0,
                `CLOSING_STOCK` INT NOT NULL DEFAULT 0,
                `CLOSING_VALUE` DECIMAL(10,2)  NOT NULL DEFAULT 0,
                PRIMARY KEY (`ID`),
                INDEX `STOCK_DATE` (`STOCK_DATE` ASC));""")
    
    #Grant privileges to user spark
    db_cursor.execute("GRANT ALL PRIVILEGES ON global_stock.* to 'spark'@'%'")
    db_cursor.execute("FLUSH PRIVILEGES")
    print("globak_stock DB successfully created")

#Create website_stats database & visit_stats table
if ( "website_stats" in db_list):
    print("website_stats DB already exists.")
else:
    print("Going to create website_stats DB")
    #Create DB
    db_cursor.execute("CREATE DATABASE website_stats")
    #Create table
    db_cursor.execute("""
       CREATE TABLE `website_stats`.`visit_stats` (
                `ID` int(11) NOT NULL AUTO_INCREMENT, 
                `INTERVAL_TIMESTAMP` DATETIME DEFAULT NULL,
                `LAST_ACTION` varchar(45) DEFAULT NULL,
                `DURATION` int(10) DEFAULT NULL, 
                PRIMARY KEY (`ID`));""")
    
    #Grant privileges to user spark
    db_cursor.execute("GRANT ALL PRIVILEGES ON website_stats.* to 'spark'@'%'")
    db_cursor.execute("FLUSH PRIVILEGES")
    print("website_stats DB successfully created")


#Check if tables are created
db_cursor.execute("""
    SELECT TABLE_SCHEMA, TABLE_NAME
    FROM information_schema.tables
    WHERE table_schema IN ('warehouse_stock','global_stock','website_stats')
    """)



print("\nSchema and Tables:\n-----------------------------")
for schema, table in db_cursor:
    print(schema," : ", table)

    

Databases available : ['information_schema', 'mysql', 'performance_schema', 'spark_de', 'sys']
Going to create warehouse_stock DB
warehouse_stock DB successfully created
Going to create global_stock DB
globak_stock DB successfully created
Going to create website_stats DB
website_stats DB successfully created

Schema and Tables:
-----------------------------
global_stock  :  item_stock
website_stats  :  visit_stats
warehouse_stock  :  item_stock


### Setup Kafka Topics

In [6]:

from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaConsumer

consumer=KafkaConsumer(
                group_id="setup",
                bootstrap_servers="localhost:9092" )

if "spark.streaming.website.visits" not in consumer.topics():

    admin_client = KafkaAdminClient(
                    bootstrap_servers="localhost:9092", 
                    client_id='spark-de')
    
    topic_list = [
        NewTopic(name="spark.streaming.website.visits", 
                 num_partitions=1, replication_factor=1),
        NewTopic(name="spark.streaming.carts.abandoned", 
                 num_partitions=1, replication_factor=1),
        NewTopic(name="spark.exercise.lastaction.long", 
                 num_partitions=1, replication_factor=1)
    ]

    print("Creating topics...")
    result=admin_client.create_topics(new_topics=topic_list, validate_only=False)

print("Topics available now :\n--------------------------------")
for topic in consumer.topics():
    print(topic)


Creating topics...
Topics available now :
--------------------------------
spark.exercise.lastaction.long
spark.streaming.carts.abandoned
spark.streaming.website.visits


### Create a raw data folder to represent a distributed file system

In [7]:
import os
os.makedirs("./raw_data",exist_ok=True)

In [8]:
#Check if JAVA_HOME is set to Java 1.17
print(os.environ["JAVA_HOME"])

#Check if HADOOP_HOME is set, needed for windows only
#print(os.environ["HADOOP_HOME"])

#Check if JAVA_HOME & HADOOP_HOME (windows only) are in the PATH
print(os.environ["PATH"])

/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home
/opt/anaconda3/envs/spark/bin:/opt/anaconda3/condabin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/opt/homebrew/bin:/usr/local/Homebrew/bin:/opt/stdlibs/homebrew
