# ISM6562 Final Exam - Programming Portion - Spring 2023

In this programming portion of the exam, you will use PySpark to analyze a dataset. The dataset is provided in the file `transactions.csv`. The dataset contains transaction information for each sale at a given company.

Instructions:
 1. Rename this file to include your name. For example, if your name is John Smith, rename this file to "John_Smith.ipynb"
 2. Complete the programming portion of the exam below.
    1. There are 4 sections (7 questions in total).
    2. Notice the time estimate provided for each section. You should use this as a guide to help you manage your time.
 3. Submit your completed Jupyter Notebook file to Canvas along with the data your analyzed.

The code you write should be well documented and easy to read. You should use comments to explain your code. 

The code required to complete this exam is very similar to the example PySpark code provided in the course. You can refer to this code as well as your own code stored on your computer or in your github repo.

**IMPORTANT***: You can use Jupyter Notebook, Jupyter Lab, or VS Code. Though you can also use VS Code. You must not have CodePilot, ChatGP, or any other plugins/software that provides code suggestions or auto-completion. You must write all of the code yourself.

**IMPORTANT: You should not share your exam with anyone else. Doing so will be considered a violation of the USF Academic Misconduct Policy.**

Good luck!

Start The Spark section

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession;

spark = SparkSession.builder.master("local[4]").appName("ISM6562 Spark App01").enableHiveSupport().getOrCreate();

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

# It's best if you find that the port number displayed below is not 4040, then you should shut down all other spark sessions and 
# run this code again. If you don't, you may have trouble accessing the data in the spark-warehouse directory.

Spark Session WebUI Port: 4040


In [2]:
spark

In [3]:
# this will set the log level to ERROR. This will hide the INFO or WARNING messages that are printed out by default. If you want to see them, set this to INFO or WARN.
sc.setLogLevel("ERROR")

## 1.0 Load data - 0.5 pts (estimated time <5 minutes)

Load the data you have been given (see question in canvas) into a pyspark dataframe.

In [4]:
# insert code here
trans_df=spark.read.csv(r"C:\Users\rmura\BigData\week 8\transactions.csv",header=True, inferSchema=True)
trans_df.show()


+--------------------+----------+--------+-----------+-----------+----------+---------+--------------------+--------------+-----+--------------------+----------------+--------------------+--------------------+----------+--------+
|      transaction_id|      date|    time|day_of_week|customer_id|first_name|last_name|               email|         state|  zip|              street|            city|               phone|             product|sale_price|returned|
+--------------------+----------+--------+-----------+-----------+----------+---------+--------------------+--------------+-----+--------------------+----------------+--------------------+--------------------+----------+--------+
|294e63f6-c6d3-439...|2018-07-27|05:13:37|     Friday|      19710|    Amanda|    Weber|brichardson@examp...|          Iowa|62736|     292 Jones Hills|    South Joseph|  (497)740-6101x1146|Total grid-enable...|     61.18|   false|
|46882d27-7d12-4e1...|2021-09-11|13:57:48|   Saturday|      31066| Frederick|   

In [5]:
trans_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- product: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- returned: boolean (nullable = true)



## 2.0 Transform Data - total 1 pts (estimated time <10 minutes)

Conduct any data tramsformations necessary (i.e. data cleaning, data type conversions, etc.)

In [6]:
from pyspark.sql.functions import datediff,date_format,to_date,to_timestamp
import pyspark.sql.functions as f
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
 
# insert code here
trans_df=trans_df.withColumn('date',to_date(trans_df.transaction_id, 'dd/MM/yyyy'))

In [7]:
trans_df.createOrReplaceTempView("trans_tempView")

In [8]:
tables=spark.sql("show tables").show()

+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|         |trans_tempview|       true|
+---------+--------------+-----------+



## 3.0 Explore Data - total 3 pts (estimated time <15 minutes)

### 3.1 What day of the week has the highest number of sales? (1.5 pts)

In [9]:
%load_ext sparksql_magic

In [10]:

%%sparksql
select  * from trans_tempview limit 2

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
transaction_id,date,time,day_of_week,customer_id,first_name,last_name,email,state,zip,street,city,phone,product,sale_price,returned
294e63f6-c6d3-4392-9991-29513874fbbd,,05:13:37,Friday,19710,Amanda,Weber,brichardson@example.org,Iowa,62736,292 Jones Hills,South Joseph,(497)740-6101x1146,Total grid-enabled Graphical User Interface,61.18,False
46882d27-7d12-4e1d-8aab-929fba924945,,13:57:48,Saturday,31066,Frederick,Hart,coxjessica@example.com,Wyoming,3254,76490 White Cove,Charlesfort,1396078600,Networked fault-tolerant standardization,137.2,True


In [11]:
dg=trans_df.sort("sale_price",ascending=False).show(1)


+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+------------+-----+--------------------+-----------------+-------------------+--------------------+----------+--------+
|      transaction_id|date|    time|day_of_week|customer_id|first_name|last_name|               email|       state|  zip|              street|             city|              phone|             product|sale_price|returned|
+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+------------+-----+--------------------+-----------------+-------------------+--------------------+----------+--------+
|922efe47-36e0-49e...|null|12:19:02|    Tuesday|      40999| Stephanie|    Combs|spearstammy@examp...|South Dakota|28110|56764 Joshua Village|South Dawnchester|+1-673-405-4676x100|Switchable optimi...|    147.83|    true|
+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+----------

we got high sales from table in week of Tuesady 

### 3.2 Which state has the highest total dollar value of sales? (1.5 pts)

In [12]:
# insert code here
trans_df.groupBy('state',)
spark.sql(" select sum(sale_price) as total_price,state from trans_tempview group by state order by total_price desc limit 5").show(1)


+------------------+-------+
|       total_price|  state|
+------------------+-------+
|4067.4700000000007|Arizona|
+------------------+-------+
only showing top 1 row



we got arizon as highest total dollars

## 4.0 Model the data - 5.5 total pts (estimated time <30 minutes)

### 4.1 Develop Predictive Model (2.5 pts)

Develop a predictive model that will predict if a sale will result in a return. The input variables/features for this model are sale price, discount, and day of week. Split your data into a train/test split of .7/.3

In [13]:
# insert code here
train_data,test_data=trans_df.randomSplit([0.7,0.3])
train_data.show(3)

+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+--------+-----+--------------------+------------+------------------+--------------------+----------+--------+
|      transaction_id|date|    time|day_of_week|customer_id|first_name|last_name|               email|   state|  zip|              street|        city|             phone|             product|sale_price|returned|
+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+--------+-----+--------------------+------------+------------------+--------------------+----------+--------+
|0070e01b-0d75-4f6...|null|18:27:52|     Monday|      23312|   Michael|    Evans|  troy48@example.org|Illinois|34660|45501 Robert Caus...|Lake Anntown| 878-741-7795x9438|Sharable content-...|     89.67|    true|
|00d35181-cf15-402...|null|10:40:02|  Wednesday|      98465|     Brian|   Larson|darrenbenton@exam...|    Iowa|69243|8628 Butler Park ...|  Port Megan|(

In [14]:
train_data.select

<bound method DataFrame.select of DataFrame[transaction_id: string, date: date, time: string, day_of_week: string, customer_id: int, first_name: string, last_name: string, email: string, state: string, zip: int, street: string, city: string, phone: string, product: string, sale_price: double, returned: boolean]>

In [15]:
dfg=train_data.select(['day_of_week','sale_price'])
dfg.show(3)

+-----------+----------+
|day_of_week|sale_price|
+-----------+----------+
|     Monday|     89.67|
|  Wednesday|     33.14|
|     Sunday|     101.3|
+-----------+----------+
only showing top 3 rows



In [16]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [17]:
day_of_week_4 = StringIndexer(inputCol='day_of_week',outputCol='day_of_week_3',handleInvalid='keep')

In [18]:
from pyspark.ml.feature import VectorAssembler
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(
    inputCols=[
        "day_of_week_3"
    ],
    outputCol="features"
)

In [19]:
pipe = Pipeline(stages=[
    day_of_week_4,
    assembler
    ]
    
)

In [20]:
fitted_pipe=pipe.fit(train_data)

In [21]:
train_data=fitted_pipe.transform(train_data)
train_data.show()

+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+-------------+-----+--------------------+---------------+--------------------+--------------------+----------+--------+-------------+--------+
|      transaction_id|date|    time|day_of_week|customer_id|first_name|last_name|               email|        state|  zip|              street|           city|               phone|             product|sale_price|returned|day_of_week_3|features|
+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+-------------+-----+--------------------+---------------+--------------------+--------------------+----------+--------+-------------+--------+
|0070e01b-0d75-4f6...|null|18:27:52|     Monday|      23312|   Michael|    Evans|  troy48@example.org|     Illinois|34660|45501 Robert Caus...|   Lake Anntown|   878-741-7795x9438|Sharable content-...|     89.67|    true|          0.0|   [0.0]|
|00d35181-cf15-402..

In [22]:
from pyspark.ml.regression import LinearRegression

lr_model = LinearRegression(labelCol="sale_price")
fit_model = lr_model.fit(train_data.select(['features','sale_price']))

In [23]:
results = fit_model.transform(train_data)
results.show()

+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+-------------+-----+--------------------+---------------+--------------------+--------------------+----------+--------+-------------+--------+-----------------+
|      transaction_id|date|    time|day_of_week|customer_id|first_name|last_name|               email|        state|  zip|              street|           city|               phone|             product|sale_price|returned|day_of_week_3|features|       prediction|
+--------------------+----+--------+-----------+-----------+----------+---------+--------------------+-------------+-----+--------------------+---------------+--------------------+--------------------+----------+--------+-------------+--------+-----------------+
|0070e01b-0d75-4f6...|null|18:27:52|     Monday|      23312|   Michael|    Evans|  troy48@example.org|     Illinois|34660|45501 Robert Caus...|   Lake Anntown|   878-741-7795x9438|Sharable content-...|     89.67

### 4.2 Evaluate Predictive Model (1.5 pts)

Evaluate the model using a test data set.

In [27]:
test_d = fit_model.transform(test_data)
test_d.show()

IllegalArgumentException: features does not exist. Available: transaction_id, date, time, day_of_week, customer_id, first_name, last_name, email, state, zip, street, city, phone, product, sale_price, returned

In [26]:
test_results = fit_model.evaluate(test_data)

IllegalArgumentException: features does not exist. Available: transaction_id, date, time, day_of_week, customer_id, first_name, last_name, email, state, zip, street, city, phone, product, sale_price, returned

In [None]:
test_results.residuals.show()

In [None]:
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'Ex Var:':7s} {test_results.explainedVariance:>7.3f}")
print(f"{'MAE:':7s} {test_results.meanAbsoluteError:>7.3f}")
print(f"{'MSE:':7s} {test_results.meanSquaredError:>7.3f}")
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'R2:':7s} {test_results.r2:>7.3f}")

### 4.2 Prediction (1.5 pts)

Use the model you developed to predict if the following sales will result in a return:

| Sales Price | Discount | Day of Week |
|-------------|----------|-------------|
| 100         | 0.0      | 1           |
| 200         | 0.1      | 2           |
| 300         | 0.1      | 3           |
| 400         | 0.3      | 4           |
| 500         | 0.2      | 5           |

In [None]:
# Insert Code Here
import pandas as pd

df=pd.DataFrame()


In [None]:
spark.stop()

# Analaysis

- First we download the data nd read the data using pyspark 
- after reading the data we need to check the data properties
- and run required quires based on requirmnet 
- vector assemle and piplein ethe data 
