# Juan Hun 345 SBA 

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, cast, max, mean, min, count

In [82]:
spark = SparkSession.builder \
            .appName('capstone-proj') \
            .getOrCreate()

df_abc = spark.read.option("multiLine", True).option("header", True).csv('./apache_spark_data/CompanyABC_stock.csv')

### 1.1  Load/Read the CompanyABC Stock (CompanyABC stock.csv) data into the SparkSQL DataFrame [ hint read() ].

In [12]:

df_abc.columns
df_abc.printSchema()
df_abc.describe().show()
df_abc.show()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)

+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      NULL| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|      NULL|  6.76809024470826|6.768186808159218|6.744075756255496|6.75685916

### 1.2 print first five rows

In [13]:
df_abc.show(5)
# or df.head(5) but I prefer show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



### 1.3 Create a new DataFrame column called “HV Ratio,” which will stimulate the ratio of the High price versus the total Volume of stock that was traded for a day.

In [None]:
df_abc = df_abc.withColumn(
    "HV Ratio",
    col("High").cast("float") / col("volume").cast("float")
)

df_abc.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+---------+---------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|4.819714682786927E-6|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|6.290848662516662E-6|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539| 4.66941298944916E-6|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922| 7.36733843444859E-6|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|8.915604814435727E-6|
+----------+------------------+---------

### 1.4: Find out on what day the stock price was the highest. (Hint: use the High column.)

In [46]:
df_abc.agg(max(col("High").cast("float")).alias("Max High")).show()


+--------+
|Max High|
+--------+
|   90.97|
+--------+



### 1.5: What is the average (mean) closing price? (Hint: Use the Close column.)

In [47]:
df_abc.agg(mean(col("Close").cast("float")).alias("Mean Close")).show()


+-----------------+
|       Mean Close|
+-----------------+
|72.38844997363553|
+-----------------+



### 1.6: What are the maximum and minimum volumes of stock traded? (Hint: Use the Volume column.).

In [55]:
df_abc.agg(max(col("Volume").cast("int")).alias("Max Volume")).show()

df_abc.agg(min(col("Volume").cast("int")).alias("Min Volume")).show()

+----------+
|Max Volume|
+----------+
|  80898100|
+----------+

+----------+
|Min Volume|
+----------+
|   2094900|
+----------+



### 1.7: For how many days was the closing value less than 70 dollars? (Hint: Use the count() method.)

In [73]:
df_abc.filter(col("Close").cast("float") < 70).count()

397

### 1.8: What percentage of the time was the High greater than 80 dollars?

In [96]:
high_above_80_count = df_abc.filter(col("High").cast("float") > 80).count()
total_count = df_abc.count()

percentage = (high_above_80_count / total_count) * 100
print(f"Percentage of rows with High > 80: {percentage}%")

Percentage of rows with High > 80: 9.141494435612083%


### 1.9: Create a database named CompanyABC_DB using SQL (Workbench). 

In [80]:
import mysql.connector as db_connection

conn = db_connection.connect(
            host = "localhost",
            user = "root",
            password = "password"
        )

cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS CompanyABC_DB")
cursor.execute("USE CompanyABC_DB")

### 1.10: Load/Write CompanyABC stock.csv file data into CompanyABC_DB database from the SparkSQL Dataframe. You can specify any table name for that file.

In [81]:
url = "jdbc:mysql://localhost:3306/CompanyABC_DB" 
properties = {
    "user": "root",  
    "password": "password",  
    "driver": "com.mysql.cj.jdbc.Driver"
}
df_abc.write.jdbc(url=url, table="CompanyABC_Table", mode="append", properties=properties)

# Section Two

### 2.1: Load/Read both CompanyABC sales datasets (Sales_April_2019.csv and Sales_February_2019.csv) into a single SparkSQL DataFrame ( hint read() ).

In [100]:
df_combined = spark.read.option("multiLine", True).option("header", True).csv([
    './apache_spark_data/Sales_April_2019.csv',
    './apache_spark_data/Sales_February_2019.csv'
])
df_combined.show()

+--------+--------------------+----------------+----------+---------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|     Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+---------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95| 4/19/2019 8:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99| 4/7/2019 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|       600|4/12/2019 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|4/12/2019 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99| 4/30/2019 9:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|4/29/2019 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|               1|     99.99|  4/2/2019 7:46|668 Center St, Se...|
|  176564|USB-C Charging Cable

### 2.2. If you use the above command on the sales dataset, you will notice that each Order has “Price Each” and “Quantity Ordered” columns, but the “Total Price” is missing.

Now, create a new Spark DataFrame column called “Total price” and find the “Total Price” of the Order for the combined sales file as shown in the screenshot.


In [102]:
df_combined = df_combined.withColumn(
        "Total Price",
        col("Quantity Ordered").cast("int") * col("Price Each").cast("float")
    )

df_combined.show()

+--------+--------------------+----------------+----------+---------------+--------------------+-----------+
|Order ID|             Product|Quantity Ordered|Price Each|     Order Date|    Purchase Address|Total Price|
+--------+--------------------+----------------+----------+---------------+--------------------+-----------+
|  176558|USB-C Charging Cable|               2|     11.95| 4/19/2019 8:46|917 1st St, Dalla...|       23.9|
|  176559|Bose SoundSport H...|               1|     99.99| 4/7/2019 22:30|682 Chestnut St, ...|      99.99|
|  176560|        Google Phone|               1|       600|4/12/2019 14:38|669 Spruce St, Lo...|      600.0|
|  176560|    Wired Headphones|               1|     11.99|4/12/2019 14:38|669 Spruce St, Lo...|      11.99|
|  176561|    Wired Headphones|               1|     11.99| 4/30/2019 9:27|333 8th St, Los A...|      11.99|
|  176562|USB-C Charging Cable|               1|     11.95|4/29/2019 13:03|381 Wilson St, Sa...|      11.95|
|  176563|Bose Soun

### 2.3: Load/Write sales data from the SparkSQL DataFrame into CompanyABC_DB database. You can specify any name for the table. Remember, “Total price” must be recorded. 

In [105]:
df_combined.write.jdbc(url=url, table="Combined_Sales", mode="append", properties=properties)