In [1]:
people = spark.read.json('resources/people.json')
df = spark.read.json('resources/people.json')

In [2]:
users = spark.read.parquet('resources/users.parquet')

In [3]:
employees = spark.read.json('resources/employees.json')
df2 = spark.read.json('resources/employees.json')

In [4]:
people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
users.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [6]:
employees.show()

+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+



#### agg
* Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg())

In [7]:
df.agg({"age": "max"}).collect()

[Row(max(age)=30)]

In [8]:
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()

[Row(min(age)=19)]

#### alias
* Returns a new DataFrame with an alias set.

In [9]:
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()

[Row(name='Michael', name='Michael', age=None),
 Row(name='Andy', name='Andy', age=30),
 Row(name='Justin', name='Justin', age=19)]

#### columns
* Returns all column names as list

In [10]:
df.columns

['age', 'name']

#### describe
* Computes statistics for numeric and string columns.
* This include count, mean, stddev, min, and max. 
* If no columns are given, this function computes statistics for all numerical or string columns.

In [11]:
df.describe().show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|              24.5|
| stddev|7.7781745930520225|
|    min|                19|
|    max|                30|
+-------+------------------+



#### distinct
* Returns a new DataFrame containing the distinct rows in this DataFrame.

In [12]:
df.distinct().collect()

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

#### drop
* Returns a new DataFrame that drops the specified column

In [13]:
df = spark.read.json('resources/people.json')
df.drop('age').collect()

[Row(name='Michael'), Row(name='Andy'), Row(name='Justin')]

#### dropduplicates
* Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
* For a static batch DataFrame, it just drops duplicate rows.

In [14]:
from pyspark.sql import Row
f = sc.parallelize([ \
     Row(name='Alice', age=5, height=80), \
     Row(name='Alice', age=5, height=80), \
     Row(name='Alice', age=10, height=80)]).toDF()

In [15]:
f.dropDuplicates().show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [16]:
f.dropDuplicates(['name', 'height']).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+



#### dropna
* Returns a new DataFrame omitting rows with null values
* how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
* thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
* subset – optional list of column names to consider.

In [17]:
df.na.drop(how='any').show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



#### explain
* Prints the (logical and physical) plans to the console for debugging purpose.

In [18]:
df.explain()

== Physical Plan ==
*Scan json [age#180L,name#181] Format: JSON, InputPaths: file:/Users/priyakallakuri/Documents/Apache Spark as a Platform for IoT/resources/people.json, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>


#### fillna
* Replace null values

In [19]:
df4 = spark.createDataFrame([(None, "joseph"), (12, "james"), (31, None)], ["age", "name"])
df4.na.fill({'age': 50, 'name': 'unknown'}).show()

+---+-------+
|age|   name|
+---+-------+
| 50| joseph|
| 12|  james|
| 31|unknown|
+---+-------+



#### filter
* Filters rows using the given condition.
* where is alias

In [20]:
df.filter(df.age > 20).collect()

[Row(age=30, name='Andy')]

In [21]:
df.where(df.age == 19).collect()

[Row(age=19, name='Justin')]

In [22]:
df.where("age = 19").collect()

[Row(age=19, name='Justin')]

### Same as RDD
#### first
#### foreach
#### foreachPartition

#### groupBy
* Groups the DataFrame using the specified columns, so we can run aggregation on them. 
* See GroupedData for all the available aggregate functions.

In [23]:
df.groupBy().avg().collect()

[Row(avg(age)=24.5)]

In [24]:
df.groupBy('name').agg({'age': 'mean'}).collect()

[Row(name='Michael', avg(age)=None),
 Row(name='Andy', avg(age)=30.0),
 Row(name='Justin', avg(age)=19.0)]

In [25]:
df.groupBy(df.name).avg().collect()

[Row(name='Michael', avg(age)=None),
 Row(name='Andy', avg(age)=30.0),
 Row(name='Justin', avg(age)=19.0)]

In [26]:
df.groupBy(['name', df.age]).count().collect()

[Row(name='Andy', age=30, count=1),
 Row(name='Michael', age=None, count=1),
 Row(name='Justin', age=19, count=1)]

#### head
* First n rows

In [27]:
df.head()

Row(age=None, name='Michael')

#### intersect
* Returns dataframe with common rows

In [28]:
#### join

In [29]:
df.join(df2, df.name == df2.name, 'outer').select(df.name).collect()

[Row(name='Michael'), Row(name='Andy'), Row(name=None), Row(name='Justin')]

In [30]:
#### schema

In [31]:
df.schema

StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))

In [32]:
#### select

In [33]:
df.select('*').collect()

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [34]:
df.select(df.name, (df.age + 10).alias('age')).collect()

[Row(name='Michael', age=None),
 Row(name='Andy', age=40),
 Row(name='Justin', age=29)]

In [35]:
#### selectExpr

In [36]:
df.selectExpr("age * 2", "abs(age)").collect()

[Row((age * 2)=None, abs(age)=None),
 Row((age * 2)=60, abs(age)=30),
 Row((age * 2)=38, abs(age)=19)]

In [37]:
#### sort

In [38]:
df.sort(df.age.desc()).collect()

[Row(age=30, name='Andy'),
 Row(age=19, name='Justin'),
 Row(age=None, name='Michael')]

In [39]:
df.orderBy(df.age.desc()).collect()

[Row(age=30, name='Andy'),
 Row(age=19, name='Justin'),
 Row(age=None, name='Michael')]

In [40]:
from pyspark.sql.functions import *

In [41]:
df.sort(asc("age")).collect()

[Row(age=None, name='Michael'),
 Row(age=19, name='Justin'),
 Row(age=30, name='Andy')]

In [42]:
#### withColumn

In [43]:
df.withColumn('age2', df.age + 2).collect()

[Row(age=None, name='Michael', age2=None),
 Row(age=30, name='Andy', age2=32),
 Row(age=19, name='Justin', age2=21)]

In [44]:
##### withColumnRenamed

In [45]:
df.withColumnRenamed('age', 'age2').collect()

[Row(age2=None, name='Michael'),
 Row(age2=30, name='Andy'),
 Row(age2=19, name='Justin')]

#### storagelevel
#### subtract
#### take
#### union
#### unpersist

In [46]:
#### toDF

In [47]:
df.toDF('f1', 'f2').collect()

[Row(f1=None, f2='Michael'), Row(f1=30, f2='Andy'), Row(f1=19, f2='Justin')]

In [48]:
#### toPandas()

In [49]:
df.toPandas()

Unnamed: 0,age,name
0,,Michael
1,30.0,Andy
2,19.0,Justin
