### Importing the required libraries

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from IPython.display import display, HTML
from pyspark.sql.types import FloatType

display(HTML('<style>pre { white-space: pre !important; }</style>'))

## Warming up

In [13]:
def read_csv(path):
    spark = SparkSession.builder.appName("readCSV").getOrCreate()
    df = spark.read.csv(path, header=True)
    return df


FILE_PATH = "stocks.csv"
df = read_csv(path=FILE_PATH)

### Preprocessing the data

In [14]:
def round_df(df, r):
    string_columns = ["Date"]
    numeric_columns = [col for col in df.columns if col not in string_columns]
    rounded_df = df.select(
        *(string_columns + [F.round(F.col(c), r).alias(c) for c in numeric_columns]))
    return rounded_df


df = round_df(df=df, r=2)

In [15]:
def task1(df):
    df.show()


def task2(df):
    df.printSchema()


def task3(df):
    filtered_df = df.filter(df["Close"] < 500)
    filtered_df.select("Open", "Close", "Volume").show()


def task4(df):
    filtered_df = df.filter((df["Close"] < 200) & (df["Open"] > 200))
    filtered_df.show()


def task5(df):
    df = df.withColumn("Year", F.split(F.col("Date"), "-")[0])
    df = df.withColumn('Year', F.col('Year').cast(FloatType()))

    return df


def task6(df):
    min_volume_by_year_df = df.groupBy("Year").agg(
        F.min("Volume").alias("Min Volume"))
    min_volume_by_year_df = min_volume_by_year_df.orderBy("Year")
    return min_volume_by_year_df


def task7(df):
    df = df.withColumn("Year", F.split(F.col("Date"), "-")[0])
    df = df.withColumn("Month", F.split(F.col("Date"), "-")[1])

    max_low_by_year_month_df = df.groupBy("Year", "Month").agg(
        F.max("Low").alias("Max Low Price"))
    max_low_by_year_month_df = max_low_by_year_month_df.orderBy(
        "Year", "Month")
    max_low_by_year_month_df.show(df.count())


def task8(df):
    mean_value = df.select(F.mean(df["High"]).alias("mean")).first()[0]
    std_deviation = df.select(F.stddev(df["High"]).alias("stddev")).first()[0]
    print("Mean:", round(mean_value, 2))
    print("Standard Deviation:", round(std_deviation, 2))

In [16]:
task2(df=df)

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Adj Close: double (nullable = true)



In [17]:

task3(df=df)

+------+------+----------+
|  Open| Close|    Volume|
+------+------+----------+
|213.43|214.01|1.234324E8|
| 214.6|214.38|1.504762E8|
|214.38|210.97|  1.3804E8|
|211.75|210.58|1.192828E8|
| 210.3|211.98|1.119027E8|
| 212.8|210.11|1.155574E8|
|209.19|207.72|1.486149E8|
|207.87|210.65| 1.51473E8|
|210.11|209.43|1.082235E8|
|210.93|205.93|1.485169E8|
|208.33|215.04|1.825019E8|
|214.91|211.73|1.530382E8|
|212.08|208.07|1.520386E8|
|206.78|197.75|2.204419E8|
|202.51|203.07|2.664249E8|
|205.95|205.94|4.667775E8|
|206.85|207.88|4.306421E8|
|204.93|199.29|2.933756E8|
|201.08|192.06|3.114881E8|
|192.37|194.73|1.874691E8|
+------+------+----------+
only showing top 20 rows



In [18]:

task4(df=df)

+----------+------+-----+------+------+----------+---------+
|      Date|  Open| High|   Low| Close|    Volume|Adj Close|
+----------+------+-----+------+------+----------+---------+
|2010-01-22|206.78|207.5|197.16|197.75|2.204419E8|    25.62|
|2010-01-28|204.93|205.5| 198.7|199.29|2.933756E8|    25.82|
|2010-01-29|201.08|202.2|190.25|192.06|3.114881E8|    24.88|
+----------+------+-----+------+------+----------+---------+



In [19]:

df = task5(df=df)

In [20]:
task6(df=df).show()

+------+----------+
|  Year|Min Volume|
+------+----------+
|2010.0| 3.93736E7|
|2011.0| 4.49155E7|
|2012.0| 4.39383E7|
|2013.0| 4.18887E7|
|2014.0| 1.44796E7|
|2015.0| 1.30464E7|
|2016.0| 1.14759E7|
+------+----------+



In [21]:

task7(df=df)

+----+-----+-------------+
|Year|Month|Max Low Price|
+----+-----+-------------+
|2010|   01|       213.25|
|2010|   02|        202.0|
|2010|   03|       234.46|
|2010|   04|       268.19|
|2010|   05|       262.88|
|2010|   06|        271.5|
|2010|   07|        260.3|
|2010|   08|       260.55|
|2010|   09|       291.01|
|2010|   10|       314.29|
|2010|   11|       316.76|
|2010|   12|        325.1|
|2011|   01|       344.44|
|2011|   02|        360.5|
|2011|   03|       357.75|
|2011|   04|        350.3|
|2011|   05|       346.88|
|2011|   06|       344.65|
|2011|   07|       399.68|
|2011|   08|       392.37|
|2011|   09|        412.0|
|2011|   10|       415.99|
|2011|   11|       401.56|
|2011|   12|       403.49|
|2012|   01|       453.07|
|2012|   02|        535.7|
|2012|   03|       610.31|
|2012|   04|        626.0|
|2012|   05|       581.23|
|2012|   06|        583.1|
|2012|   07|        606.0|
|2012|   08|       673.54|
|2012|   09|       699.57|
|2012|   10|       665.55|
|

In [22]:

task8(df=df)

Mean: 315.91
Standard Deviation: 186.9


### Tasks :
1. Read the csv file correctly!
2. Find out about the schema of data.
3. For those records with closing price less than 500, select opening, closing and volume
and show them.
4. Find out records with opening price more than 200 and closing price less than 200.
5. Extract the year from the date and save it in a new column.
6. Now, for each year, show the minimum volumes traded, shown in a column named
‘minVolume’.
7. Follow quite the same procedure as the previous step, but now for each year and
month, show the highest low price, shown in a column named ‘maxLow’.
8. For the last step, calculate mean and standard deviation of high price over the whole
data frame and show them in two decimal places.