In [0]:
## Spark DataFrame
#Groupby and Aggregate Functions

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('aggs').getOrCreate()

In [0]:
df = spark.read.table('sales_info_csv')

In [0]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [0]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: float (nullable = true)



In [0]:
df.groupBy('Company')

Out[6]: <pyspark.sql.group.GroupedData at 0x7f86c96dcee0>

In [0]:
df.groupBy('Company').mean().show()
df.groupBy('Company').min().show()
df.groupBy('Company').count().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [0]:
df.agg({'Sales':'sum'}).show()
df.agg({'Sales':'max'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [0]:
group_data = df.groupBy('Company')
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [0]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [0]:
df.select(countDistinct('Sales')).show()
df.select(stddev('Sales').alias('Standard Deviation Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+------------------------+
|Standard Deviation Sales|
+------------------------+
|      250.08742410799007|
+------------------------+



In [0]:
df.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [0]:
from pyspark.sql.functions import format_number

In [0]:
sales_std = df.select(stddev('Sales').alias('STD'))
sales_std

Out[29]: DataFrame[STD: double]

In [0]:
sales_std.select(format_number('STD',2).alias('STD')).show()

+------+
|   STD|
+------+
|250.09|
+------+



In [0]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [0]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [0]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [0]:
#Missing Data

In [0]:
df = spark.read.table('containsnull_csv')
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
#Threshold of at least 2 non-null values
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.na.drop(subset = ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.printSchema()
df.na.fill('FILL VALUE').show()
df.na.fill(2).show()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: float (nullable = true)

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  2.0|
|emp2| null|  2.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.na.fill('No Name',subset = ['Name']).show()


+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [0]:
from pyspark.sql.functions import mean
mean_val = df.select(mean(df['Sales'])).collect()
mean_sales = mean_val[0][0]

In [0]:
df.na.fill(mean_sales,subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0],subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [0]:
#Dates and Timestamps

In [0]:
df = spark.read.table('appl_stock_2_csv')
df.head(1)

Out[59]: [Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.42999267578125, High=214.5, Low=212.3800048828125, Close='214.009998', Volume=123432400.0, Adj Close=27.727039337158203)]

In [0]:
df.show()

+-------------------+---------+---------+---------+------------------+------------+---------+
|               Date|     Open|     High|      Low|             Close|      Volume|Adj Close|
+-------------------+---------+---------+---------+------------------+------------+---------+
|2010-01-04 00:00:00|   213.43|    214.5|   212.38|        214.009998|  1.234324E8| 27.72704|
|2010-01-05 00:00:00|214.59999|   215.59|   213.25|        214.379993|1.50476192E8|27.774977|
|2010-01-06 00:00:00|214.37999|   215.23|   210.75|        210.969995|    1.3804E8|27.333178|
|2010-01-07 00:00:00|   211.75|    212.0|   209.05|            210.58|  1.192828E8| 27.28265|
|2010-01-08 00:00:00|210.29999|    212.0|   209.06|211.98000499999998|1.11902704E8|27.464033|
|2010-01-11 00:00:00|    212.8|    213.0|208.45001|210.11000299999998|  1.155574E8|27.221758|
|2010-01-12 00:00:00|   209.19|209.76999|   206.42|        207.720001|1.48614896E8| 26.91211|
|2010-01-13 00:00:00|207.87001|   210.93|204.09999|        2

In [0]:
df.select(['Date','Open']).show()

+-------------------+---------+
|               Date|     Open|
+-------------------+---------+
|2010-01-04 00:00:00|   213.43|
|2010-01-05 00:00:00|214.59999|
|2010-01-06 00:00:00|214.37999|
|2010-01-07 00:00:00|   211.75|
|2010-01-08 00:00:00|210.29999|
|2010-01-11 00:00:00|    212.8|
|2010-01-12 00:00:00|   209.19|
|2010-01-13 00:00:00|207.87001|
|2010-01-14 00:00:00|   210.11|
|2010-01-15 00:00:00|   210.93|
|2010-01-19 00:00:00|   208.33|
|2010-01-20 00:00:00|   214.91|
|2010-01-21 00:00:00|212.07999|
|2010-01-22 00:00:00|   206.78|
|2010-01-25 00:00:00|   202.51|
|2010-01-26 00:00:00|   205.95|
|2010-01-27 00:00:00|206.84999|
|2010-01-28 00:00:00|204.93001|
|2010-01-29 00:00:00|   201.08|
|2010-02-01 00:00:00|   192.37|
+-------------------+---------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import dayofmonth, hour, dayofyear, month, year, weekofyear, format_number, date_format

In [0]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [0]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [0]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [0]:
#df.select(year(df['Date'])).show()
newdf = df.withColumn("Year",year(df['Date']))

In [0]:
result = newdf.groupBy('Year').mean().select(["Year","avg(Adj Close)"])
result.show()

+----+------------------+
|Year|    avg(Adj Close)|
+----+------------------+
|2015|115.96740074763223|
|2013| 62.61798772357759|
|2014| 87.63583331637912|
|2012| 74.81383665466309|
|2016|103.15032883295937|
|2010| 33.66507240325686|
|2011| 47.16023699442545|
+----+------------------+



In [0]:
new = result.withColumnRenamed("avg(Adj Close)","Avg Closing Price Adjusted")
new.show()

+----+--------------------------+
|Year|Avg Closing Price Adjusted|
+----+--------------------------+
|2015|        115.96740074763223|
|2013|         62.61798772357759|
|2014|         87.63583331637912|
|2012|         74.81383665466309|
|2016|        103.15032883295937|
|2010|         33.66507240325686|
|2011|         47.16023699442545|
+----+--------------------------+



In [0]:
new.select(['Year',format_number('Avg Closing Price Adjusted',2).alias('Avg Closing Price Adj')]).show()

+----+---------------------+
|Year|Avg Closing Price Adj|
+----+---------------------+
|2015|               115.97|
|2013|                62.62|
|2014|                87.64|
|2012|                74.81|
|2016|               103.15|
|2010|                33.67|
|2011|                47.16|
+----+---------------------+

