In [248]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('my-practice').getOrCreate()

In [249]:
df = spark.read.csv('walmart_stock.csv', header=True, inferSchema=True)

In [250]:
df.show()

+--------------------+------------------+------------------+------------------+------------------+--------+------------------+
|                Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+--------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03 00:00:...|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:...|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:...|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06 00:00:...|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09 00:00:...|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.6

In [251]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [252]:
df.select('Open').show(3)

+------------------+
|              Open|
+------------------+
|         59.970001|
|60.209998999999996|
|         59.349998|
+------------------+
only showing top 3 rows



In [253]:
df.select(['Open','Close']).show(4)

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|         59.970001|         60.330002|
|60.209998999999996|59.709998999999996|
|         59.349998|         59.419998|
|         59.419998|              59.0|
+------------------+------------------+
only showing top 4 rows



In [254]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [255]:
for row in df.head(5):
    print(row)
    print('\n')

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)




In [256]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [257]:
# from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DateType

In [258]:
# schema_type = [
#     StructField('Date',DateType(),True),
#     StructField('Open',IntegerType(),True),
#     StructField('High', IntegerType(),True),
#     StructField('Low', IntegerType(),True),
#     StructField('Close', IntegerType(),True),
#     StructField('Volume', IntegerType(),True),
#     StructField('Adj Close', IntegerType(),True)
# ]

In [259]:
from pyspark.sql.functions import format_number

In [260]:
result = df.describe()

In [261]:
result.show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [262]:
result.select(result['summary'],
             format_number(result['Open'].cast('float'),2).alias('Open'),
             format_number(result['High'].cast('float'),2).alias('High'),
             format_number(result['Low'].cast('float'),2).alias('Low'),
             format_number(result['Close'].cast('float'),2).alias('Close'),
             result['Volume'].cast('int').alias('Volume')           
             ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519781|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [263]:
final_struc = StructType(fields=schema_type)

In [264]:
# df = spark.read.csv('walmart_stock.csv',schema=final_struc)

In [265]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [266]:
 new_df = df.withColumn('HV', df['High']/df['Volume'])

In [267]:
new_df.select('HV').show()

+--------------------+
|                  HV|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



In [268]:
df.orderBy(df['High'].desc()).head(1)[0][0]


datetime.datetime(2015, 1, 13, 0, 0)

In [269]:
from pyspark.sql.functions import mean, avg, count, max, min

In [270]:
new_df.select(mean('Close')).show()
print('\n')
new_df.select(count('Close')).show()
print('\n')
new_df.select(max('Volume'),min('Volume')).show()


+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



+------------+
|count(Close)|
+------------+
|        1258|
+------------+



+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [271]:
new_df.filter(df['Close'] < 60).count()

81

In [272]:
top = new_df.filter(df['High'] > 80).count()
bottom = new_df.filter(df['High'] > 0).count()

In [273]:
percentage = (top/bottom)*100
print( '%' , round(percentage,2))

% 9.14


In [274]:
from pyspark.sql.functions import corr
df.select(corr('High','Volume')).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [288]:
from pyspark.sql.functions import year
from pyspark.sql.functions import month

In [276]:
new_df = new_df.withColumn('Year',year(new_df['Date']))

In [283]:
max_df = new_df.groupBy('Year').max()
max_df.show()

+----+-----------------+---------+---------+----------+-----------+-----------------+--------------------+---------+
|Year|        max(Open)|max(High)| max(Low)|max(Close)|max(Volume)|   max(Adj Close)|             max(HV)|max(Year)|
+----+-----------------+---------+---------+----------+-----------+-----------------+--------------------+---------+
|2015|        90.800003|90.970001|    89.25| 90.470001|   80898100|84.91421600000001|2.460931126147897...|     2015|
|2013|        81.209999|81.370003|    80.82| 81.209999|   25683700|        73.929868|3.734784381116044E-5|     2013|
|2014|87.08000200000001|88.089996|86.480003| 87.540001|   22812400|81.70768000000001|3.494261176659442...|     2014|
|2012|        77.599998|77.599998|76.690002| 77.150002|   38007300|        68.568371|2.368837716882401...|     2012|
|2016|             74.5|75.190002|73.629997| 74.300003|   35076700|        73.233524|1.712227308529861...|     2016|
+----+-----------------+---------+---------+----------+---------

In [287]:
max_df.select('Year','max(High)').orderBy('Year').show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



In [289]:
new_df = new_df.withColumn('Month',month('Date'))


In [297]:
mean_df = new_df.groupBy('Month').mean()
mean_df.show()

+-----+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+--------------------+------------------+----------+
|Month|        avg(Open)|        avg(High)|         avg(Low)|       avg(Close)|      avg(Volume)|   avg(Adj Close)|             avg(HV)|         avg(Year)|avg(Month)|
+-----+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+--------------------+------------------+----------+
|   12|72.87952850943395|73.35566025471698|72.44481152830188|72.84792478301885|7967959.433962264|68.46031040566042|1.164205323990347...|2014.0283018867924|      12.0|
|    1|71.40811884158416|71.97009924752473|70.90425712871289|71.44801958415842|8761851.485148516|65.56887865346533|9.384442284041966E-6| 2013.970297029703|       1.0|
|    6| 72.5100938962264| 72.9262265471698|72.12198099056603| 72.4953774245283|8303756.603773585|67.43827906603772|9.928110109060035E-6|2014.0377358490566|       6.0

In [298]:

mean_df.select('Month','avg(Close)').orderBy('Month').show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

