In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('kk').getOrCreate()

In [24]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,dayofweek,month,date_format,format_number,year,weekofyear,avg)

In [12]:
df = spark.read.csv("Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv", inferSchema=True,header=True)

In [13]:
df.select(['Date','Open']).show()

+----------+------------------+
|      Date|              Open|
+----------+------------------+
|2010-01-04|        213.429998|
|2010-01-05|        214.599998|
|2010-01-06|        214.379993|
|2010-01-07|            211.75|
|2010-01-08|        210.299994|
|2010-01-11|212.79999700000002|
|2010-01-12|209.18999499999998|
|2010-01-13|        207.870005|
|2010-01-14|210.11000299999998|
|2010-01-15|210.92999500000002|
|2010-01-19|        208.330002|
|2010-01-20|        214.910006|
|2010-01-21|        212.079994|
|2010-01-22|206.78000600000001|
|2010-01-25|202.51000200000001|
|2010-01-26|205.95000100000001|
|2010-01-27|        206.849995|
|2010-01-28|        204.930004|
|2010-01-29|        201.079996|
|2010-02-01|192.36999699999998|
+----------+------------------+
only showing top 20 rows



In [14]:
 df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [20]:
df.select(year('Date')).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [117]:
newdf = df.withColumn('Year',year('Date'))

In [118]:
naya = newdf.groupBy('Year').avg().select(["Year","avg(Close)"])

In [119]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [120]:
naya = naya.select(['Year',format_number('avg(Close)',3).alias("Closing_Price")]).orderBy('Year')

In [121]:
naya.show()

+----+-------------+
|Year|Closing_Price|
+----+-------------+
|2010|      259.842|
|2011|      364.004|
|2012|      576.050|
|2013|      472.635|
|2014|      295.402|
|2015|      120.040|
|2016|      104.604|
+----+-------------+



In [122]:
my_window = Window.partitionBy().orderBy("Year")

In [123]:
naya = naya.withColumn('prev_yr',F.lag(naya['Closing_Price']).over(my_window))

In [124]:
naya.show()

+----+-------------+-------+
|Year|Closing_Price|prev_yr|
+----+-------------+-------+
|2010|      259.842|   null|
|2011|      364.004|259.842|
|2012|      576.050|364.004|
|2013|      472.635|576.050|
|2014|      295.402|472.635|
|2015|      120.040|295.402|
|2016|      104.604|120.040|
+----+-------------+-------+



In [74]:
## Adding + and minus
naya.withColumn('Percentage Change',F.when(F.isnull(naya['Closing_Price'] - naya['prev_yr']),0)
             .when(naya['Closing_Price'] > naya['prev_yr'],F.concat(F.lit("+"),(naya['Closing_Price']-naya['prev_yr'])/(naya['prev_yr']*100)))
             .otherwise(F.concat(F.lit("-"),(naya['prev_yr']-naya['Closing_Price'])/(naya['Closing_Price']*100)))).show()

+----+-------+-------+--------------------+
|Year|main_yr|prev_yr|   Percentage Change|
+----+-------+-------+--------------------+
|2010|259.842|   null|                   0|
|2011|364.004|259.842| +0.0040086668052124|
|2012|576.050|364.004|+0.00582537554532...|
|2013|472.635|576.050|-0.00218805209093...|
|2014|295.402|472.635|-0.00599972241217...|
|2015|120.040|295.402|-0.01460863045651...|
|2016|104.604|120.040|-0.00147566058659...|
+----+-------+-------+--------------------+



In [84]:
## Just Profit and loss
naya.withColumn('Percentage Change',F.when(F.isnull(naya['Closing_Price'] - naya['prev_yr']),'None')
             .when(naya['Closing_Price'] > naya['prev_yr'],'Profit')
             .otherwise('Loss')).show()

+----+-------+-------+-----------------+-------------------+-------------------+
|Year|main_yr|prev_yr|Percentage Change|   PercentageChange|  Percentage_Change|
+----+-------+-------+-----------------+-------------------+-------------------+
|2010|259.842|   null|             None|                  0|                  0|
|2011|364.004|259.842|           Profit|    40.086668052124|    40.086668052124|
|2012|576.050|364.004|           Profit|  58.25375545323676|  58.25375545323676|
|2013|472.635|576.050|             Loss|-17.952434684489187|-17.952434684489187|
|2014|295.402|472.635|             Loss| -37.49891565372857| -37.49891565372857|
|2015|120.040|295.402|             Loss| -59.36384994008165| -59.36384994008165|
|2016|104.604|120.040|             Loss| -12.85904698433856| -12.85904698433856|
+----+-------+-------+-----------------+-------------------+-------------------+



In [105]:
naya = naya.withColumn('Percentage_Change',F.when(F.isnull(naya['Closing_Price'] - naya['prev_yr']),0)
             .otherwise(((naya['Closing_Price']-naya['prev_yr'])/(naya['prev_yr']))*100))

In [106]:
naya.show()

+----+-------+-------+-------------------+
|Year|main_yr|prev_yr|  Percentage_Change|
+----+-------+-------+-------------------+
|2010|259.842|   null|                0.0|
|2011|364.004|259.842|    40.086668052124|
|2012|576.050|364.004|  58.25375545323676|
|2013|472.635|576.050|-17.952434684489187|
|2014|295.402|472.635| -37.49891565372857|
|2015|120.040|295.402| -59.36384994008165|
|2016|104.604|120.040| -12.85904698433856|
+----+-------+-------+-------------------+



In [107]:
naya.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- main_yr: string (nullable = true)
 |-- prev_yr: string (nullable = true)
 |-- Percentage_Change: double (nullable = true)



In [111]:
naya.select(['Year','Closing_Price',format_number('Percentage_Change',3).alias('Percentage Change')]).show()

+----+-------+-----------------+
|Year|main_yr|Percentage Change|
+----+-------+-----------------+
|2010|259.842|            0.000|
|2011|364.004|           40.087|
|2012|576.050|           58.254|
|2013|472.635|          -17.952|
|2014|295.402|          -37.499|
|2015|120.040|          -59.364|
|2016|104.604|          -12.859|
+----+-------+-----------------+



In [None]:
# +----+-------+-----------------+
# |Year|main_yr|Percentage Change|
# +----+-------+-----------------+
# |2010|259.842|            0.000|
# |2011|364.004|           40.087|
# |2012|576.050|           58.254|
# |2013|472.635|          -17.952|
# |2014|295.402|          -37.499|
# |2015|120.040|          -59.364|
# |2016|104.604|          -12.859|
# +----+-------+-----------------+