In [4]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('miss').getOrCreate()

In [7]:
filename = '../Python-and-Spark-for-Big-Data-master/Spark_DataFrames/ContainsNull.csv'
df = spark.read.csv(filename, header=True, inferSchema=True)

In [8]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [11]:
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [12]:
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [14]:
df.na.fill('FILL VALUE').show()   #Fill Value on String type only

+----+----------+-----+
|  Id|      Name|Sales|
+----+----------+-----+
|emp1|      John| null|
|emp2|FILL VALUE| null|
|emp3|FILL VALUE|345.0|
|emp4|     Cindy|456.0|
+----+----------+-----+



In [15]:
df.na.fill('No Name', subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [16]:
from pyspark.sql.functions import mean

In [19]:
mean_val = df.select(mean(df['Sales'])).collect()
mean_val[0]
mean_sales = mean_val[0][0]

In [20]:
df.na.fill(mean_sales, subset=['Sales']).show()
# df.na.fill(df.select(mean(df['Sales'])).collect()[0][0], ['Sales']).show()         # One line, ugly way

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [22]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [23]:
filename = '../Python-and-Spark-for-Big-Data-master/Spark_DataFrames/appl_stock.csv'
df = spark.read.csv(filename, header=True, inferSchema=True)

In [25]:
df.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [27]:
from pyspark.sql.functions import (dayofmonth, hour,
                                  dayofyear, month,
                                  year, weekofyear,
                                  format_number, date_format)

In [28]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [29]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [37]:
# Add new column 
#df.select(year(df['Date'])).show()
df.withColumn('Year', year(df['Date'])).show()
newdf = df.withColumn('Year', year(df['Date']))

+--------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|                Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+--------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:...|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:...|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:...|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:...|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:...|        210.299994|        212.000006|209.06000

In [42]:
# GroupBy year 
result = newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])
result.withColumnRenamed('avg(Close)', 'Average Closing Price').show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|   120.03999980555547|
|2013|    472.6348802857143|
|2014|    295.4023416507935|
|2012|    576.0497195640002|
|2016|   104.60400786904763|
|2010|    259.8424600000002|
|2011|   364.00432532142867|
+----+---------------------+



In [44]:
# Change name of column, format the column
result.select(['Year', format_number('avg(Close)', 2).alias('AVG close')]).show()

+----+---------+
|Year|AVG close|
+----+---------+
|2015|   120.04|
|2013|   472.63|
|2014|   295.40|
|2012|   576.05|
|2016|   104.60|
|2010|   259.84|
|2011|   364.00|
+----+---------+

