<a href="https://colab.research.google.com/github/vieiralc/spark/blob/main/pySparkExercices.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=76ef304820eb38440e7c24fcc83449fc9906190c44a648e54e31540406a70c4e
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


# Start a simple spark session

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark_exercices').getOrCreate()

# Load the Walmart Stock CSV file, have spark infer the data types

In [11]:
df = spark.read.csv('/content/sample_data/6_walmart_stock.csv', inferSchema=True, header=True)
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



# What are the column names?

In [21]:
print('The columns are')
print('---------')
for column in df.columns:
  print(column)
  print('---------')

The columns are
---------
Date
---------
Open
---------
High
---------
Low
---------
Close
---------
Volume
---------
Adj Close
---------


In [9]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

# What does the Schema looks like?

In [10]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



# Print the first 5 columns and 5 rows

In [12]:
columns = df.columns[:5]
df.select(columns).show(5)

+----------+------------------+---------+---------+------------------+
|      Date|              Open|     High|      Low|             Close|
+----------+------------------+---------+---------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18|
+----------+------------------+---------+---------+------------------+
only showing top 5 rows



# Use describe to learn about the data frame

In [35]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

# There are too many decimal places for mean and stddev in the describe dataframe. Format the numbers to just show up two decimal places.

In [17]:
from pyspark.sql.functions import format_number

In [38]:
desc = df.describe()
desc.select('summary',
            format_number(desc['Open'].cast('Float'), 2).alias('Open'),
            format_number(desc['High'].cast('Float'), 2).alias('High'),
            format_number(desc['Low'].cast('Float'), 2).alias('Low'),
            format_number(desc['Close'].cast('Float'), 2).alias('Close'),
            format_number(desc['Volume'].cast('Float'), 2).alias('Volume'),
            format_number(desc['Adj Close'].cast('Float'), 2).alias('Adj Close')).show()

+-------+--------+--------+--------+--------+-------------+---------+
|summary|    Open|    High|     Low|   Close|       Volume|Adj Close|
+-------+--------+--------+--------+--------+-------------+---------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00| 1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.50|    67.24|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,781.00|     6.72|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|    50.36|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,096.00|    84.91|
+-------+--------+--------+--------+--------+-------------+---------+



# Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day

In [43]:
new_df = df.withColumn('HV Ratio', df['High']/df['Volume'])
new_df.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|            HV Ratio|
+----------+------------------+------------------+------------------+------------------+--------+------------------+--------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|4.819714653321546E-6|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|6.290848613094555E-6|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|4.669412994783916E-6|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|7.367338463826307E-6|
|2012-01-09|         59.029999|   

# What day had the Peak High in Price?

In [52]:
df.orderBy(df['High'].desc()).head(1)[0]['Date']

datetime.date(2015, 1, 13)

# What is the mean of the close column

In [53]:
from pyspark.sql.functions import mean
df.select(mean('close')).show()

+-----------------+
|       avg(close)|
+-----------------+
|72.38844998012726|
+-----------------+



# What is the max and min for Volume column?

In [55]:
from pyspark.sql.functions import max, min
df.select(max('Volume'), min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



# How many days was the Close lower than 60 dollars?

In [59]:
df[df['Close'] < 60].count()

81

# What percentage of the time was the High greater than 80 dollars? In other words, (Number of days High > 80)/(total days in data set)

In [67]:
total_days = df.count()
high_days_greater_than_80 = df[df['High'] > 80].count()
percentage = (high_days_greater_than_80 / total_days)*100
print(round(percentage, 2), "%")


9.14 %


# What is the Pearson correlation between High and Volume?

In [68]:
df.corr('High', 'Volume')

-0.3384326061737161

# What is the max high per year?

In [None]:
from pyspark.sql.functions import year

In [36]:
df_year = df.withColumn('Year', year(df['Date']))
# df_year.groupBy('Year').max('High').show()
df_year.groupBy('Year').max('High').select('Year', format_number('max(High)', 2).alias('Max High')).show()

+----+--------+
|Year|Max High|
+----+--------+
|2015|   90.97|
|2013|   81.37|
|2014|   88.09|
|2012|   77.60|
|2016|   75.19|
+----+--------+



# What is the Close average for each Month? In other words, across all the years, what is the average for Close price. Ex: Jan, Fev, Mar, etc... Your result will have a value for each of these months

In [37]:
from pyspark.sql.functions import month

In [50]:
df_month = df.withColumn('Month', month(df['Date']))
df_month.groupBy('Month').avg('Close').select(
    'Month',
    format_number('avg(Close)', 2).alias('Close Average')
).show()

+-----+-------------+
|Month|Close Average|
+-----+-------------+
|   12|        72.85|
|    1|        71.45|
|    6|        72.50|
|    3|        71.78|
|    5|        72.31|
|    9|        72.18|
|    4|        72.97|
|    8|        73.03|
|    7|        74.44|
|   10|        71.58|
|   11|        72.11|
|    2|        71.31|
+-----+-------------+



# Great Job!