In [9]:
import os
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf

In [10]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import countDistinct, avg
from pyspark.sql.functions import dayofmonth, dayofyear, year, month, hour, weekofyear, date_format
from pyspark.sql.functions import col as func_col

In [11]:
app_name = 'Saravanan : Spark SQL'
app_name

'Saravanan : Spark SQL'

In [12]:
spark = SparkSession.builder.appName(app_name).getOrCreate()

In [13]:
def get_hdfs_filepath(file_name):
    my_hdfs = "hdfs://localhost:9000/module6_datasets/DataSets/"
    #my_hdfs = "/module6_datasets/DataSets/"
    return os.path.join(my_hdfs, file_name)

In [14]:
EMP_JSON = get_hdfs_filepath('emp.json')
EMP_CSV = get_hdfs_filepath('emp.csv') # comma delimited
DEPT_CSV = get_hdfs_filepath('dept.csv')

PEOPLE_JSON = get_hdfs_filepath('people.json')
GOOG_CSV = get_hdfs_filepath('goog_stock.csv')
NESTED_JSON = get_hdfs_filepath('nested.json')

In [15]:
print(EMP_JSON)

hdfs://localhost:9000/module6_datasets/DataSets/emp.json


In [16]:
emp_json_df = spark.read.json(EMP_JSON)

In [17]:
emp_json_df.show()

+---+---+------+------+
|age| id|  name|salary|
+---+---+------+------+
| 30|  1|  Bill| 10000|
| 40|  2| Steve| 12000|
| 50|  3|Donald| 14000|
| 60|  4|  Modi| 18000|
| 70|  5|Sunder| 22000|
| 80|  6|  Jeff| 26000|
| 90|  7|Sergey| 30000|
+---+---+------+------+



In [18]:
emp_json_df.describe()

DataFrame[summary: string, age: string, id: string, name: string, salary: string]

In [19]:
emp_json_df.describe().show()

+-------+------------------+-----------------+------+-----------------+
|summary|               age|               id|  name|           salary|
+-------+------------------+-----------------+------+-----------------+
|  count|                 7|                7|     7|                7|
|   mean|              60.0|              4.0|  null|18857.14285714286|
| stddev|21.602468994692867|2.160246899469287|  null|7470.577207252718|
|    min|                30|                1|  Bill|            10000|
|    max|                90|                7|Sunder|            30000|
+-------+------------------+-----------------+------+-----------------+



In [20]:
emp_json_df.head(2)

[Row(age=30, id=1, name='Bill', salary=10000),
 Row(age=40, id=2, name='Steve', salary=12000)]

In [21]:
emp_json_df.columns

['age', 'id', 'name', 'salary']

In [22]:
emp_json_df.printSchema()

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: long (nullable = true)



In [23]:
emp_json_df['salary']

Column<'salary'>

In [24]:
emp_json_df['salary'].show()

TypeError: 'Column' object is not callable

In [25]:
emp_json_df.select("salary")

DataFrame[salary: bigint]

In [26]:
emp_json_df.select("salary").show()

+------+
|salary|
+------+
| 10000|
| 12000|
| 14000|
| 18000|
| 22000|
| 26000|
| 30000|
+------+



In [28]:
emp_json_df.select(['salary']).head(2)

[Row(salary=10000), Row(salary=12000)]

In [29]:
emp_json_df.select(['salary']).show(2)

+------+
|salary|
+------+
| 10000|
| 12000|
+------+
only showing top 2 rows



In [30]:
emp_json_df.select(['salary']).tail(2)

[Row(salary=26000), Row(salary=30000)]

In [31]:
type(emp_json_df['salary'])

pyspark.sql.column.Column

### Reading Nested  JSON

In [33]:
nested = spark.read.json(NESTED_JSON)

In [34]:
nested.printSchema()

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- phones: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- reporting: struct (nullable = true)
 |    |-- manager: string (nullable = true)
 |    |-- rm: string (nullable = true)
 |-- salary: long (nullable = true)



In [35]:
nested.show()

+---+---+----+--------------+--------------+------+
|age| id|name|        phones|     reporting|salary|
+---+---+----+--------------+--------------+------+
| 30|  1|Bill|[12345, 56789]|{Steve, Brian}| 10000|
+---+---+----+--------------+--------------+------+



In [36]:
nested.select(['phones','salary']).show()

+--------------+------+
|        phones|salary|
+--------------+------+
|[12345, 56789]| 10000|
+--------------+------+



In [37]:
nested.select(explode('phones').alias("phone"),'age').show()

+-----+---+
|phone|age|
+-----+---+
|12345| 30|
|56789| 30|
+-----+---+



In [38]:
nested.select('reporting','age','salary').show()

+--------------+---+------+
|     reporting|age|salary|
+--------------+---+------+
|{Steve, Brian}| 30| 10000|
+--------------+---+------+



In [39]:
nested.select(nested['reporting'].alias('tmp'),'id','age','salary').show()

+--------------+---+---+------+
|           tmp| id|age|salary|
+--------------+---+---+------+
|{Steve, Brian}|  1| 30| 10000|
+--------------+---+---+------+



In [40]:
nested.select(nested['reporting'].alias('tmp'),'id','age','salary').select('tmp.*','id','age','salary').show()

+-------+-----+---+---+------+
|manager|   rm| id|age|salary|
+-------+-----+---+---+------+
|  Steve|Brian|  1| 30| 10000|
+-------+-----+---+---+------+



In [41]:
nested.select('reporting.*','id','age','salary').show()

+-------+-----+---+---+------+
|manager|   rm| id|age|salary|
+-------+-----+---+---+------+
|  Steve|Brian|  1| 30| 10000|
+-------+-----+---+---+------+



In [42]:
nested.select('*').show()

+---+---+----+--------------+--------------+------+
|age| id|name|        phones|     reporting|salary|
+---+---+----+--------------+--------------+------+
| 30|  1|Bill|[12345, 56789]|{Steve, Brian}| 10000|
+---+---+----+--------------+--------------+------+



In [45]:
nested.select('age','id',explode('phones').alias('Phone'),'reporting.*','salary').show()

+---+---+-----+-------+-----+------+
|age| id|Phone|manager|   rm|salary|
+---+---+-----+-------+-----+------+
| 30|  1|12345|  Steve|Brian| 10000|
| 30|  1|56789|  Steve|Brian| 10000|
+---+---+-----+-------+-----+------+



### Single Column DF

In [46]:
emp_json_df.show()

+---+---+------+------+
|age| id|  name|salary|
+---+---+------+------+
| 30|  1|  Bill| 10000|
| 40|  2| Steve| 12000|
| 50|  3|Donald| 14000|
| 60|  4|  Modi| 18000|
| 70|  5|Sunder| 22000|
| 80|  6|  Jeff| 26000|
| 90|  7|Sergey| 30000|
+---+---+------+------+



In [47]:
emp_json_df.head()

Row(age=30, id=1, name='Bill', salary=10000)

In [48]:
emp_json_df.head(3)

[Row(age=30, id=1, name='Bill', salary=10000),
 Row(age=40, id=2, name='Steve', salary=12000),
 Row(age=50, id=3, name='Donald', salary=14000)]

In [49]:
emp_json_df.show(2,truncate=True)

+---+---+-----+------+
|age| id| name|salary|
+---+---+-----+------+
| 30|  1| Bill| 10000|
| 40|  2|Steve| 12000|
+---+---+-----+------+
only showing top 2 rows



In [50]:
type(emp_json_df)

pyspark.sql.dataframe.DataFrame

In [51]:
type(emp_json_df.head(2)[0])


pyspark.sql.types.Row

In [52]:
emp_json_df.head(2)[0]

Row(age=30, id=1, name='Bill', salary=10000)

### Multiple Columns

In [55]:
emp_json_df.select(['age','name'])

DataFrame[age: bigint, name: string]

In [56]:
emp_json_df.select(['age','name']).show()

+---+------+
|age|  name|
+---+------+
| 30|  Bill|
| 40| Steve|
| 50|Donald|
| 60|  Modi|
| 70|Sunder|
| 80|  Jeff|
| 90|Sergey|
+---+------+



In [57]:
emp_json_df.age

Column<'age'>

In [58]:
emp_json_df.select(emp_json_df.age.alias("emp_age"), "name", (emp_json_df.salary*1.05).alias('rev_sal')).show()

+-------+------+-------+
|emp_age|  name|rev_sal|
+-------+------+-------+
|     30|  Bill|10500.0|
|     40| Steve|12600.0|
|     50|Donald|14700.0|
|     60|  Modi|18900.0|
|     70|Sunder|23100.0|
|     80|  Jeff|27300.0|
|     90|Sergey|31500.0|
+-------+------+-------+



### Adding new column

In [59]:
emp_json_df['newsal']= emp_json_df['salary']*1.05

TypeError: 'DataFrame' object does not support item assignment

In [60]:
emp_json_df.withColumn('newsal',emp_json_df['salary']*1.05).show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 30|  1|  Bill| 10000|10500.0|
| 40|  2| Steve| 12000|12600.0|
| 50|  3|Donald| 14000|14700.0|
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [61]:
emp_json_df.columns

['age', 'id', 'name', 'salary']

### Save Changes to DF using

In [62]:
emp_json_df = emp_json_df.withColumn('newsal',emp_json_df['salary']*1.05)

In [63]:
emp_json_df.columns

['age', 'id', 'name', 'salary', 'newsal']

In [64]:
emp_json_df.show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 30|  1|  Bill| 10000|10500.0|
| 40|  2| Steve| 12000|12600.0|
| 50|  3|Donald| 14000|14700.0|
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



### Renaming Columns

In [65]:
emp_json_df.withColumnRenamed('newsal','revived_salary').show()

+---+---+------+------+--------------+
|age| id|  name|salary|revived_salary|
+---+---+------+------+--------------+
| 30|  1|  Bill| 10000|       10500.0|
| 40|  2| Steve| 12000|       12600.0|
| 50|  3|Donald| 14000|       14700.0|
| 60|  4|  Modi| 18000|       18900.0|
| 70|  5|Sunder| 22000|       23100.0|
| 80|  6|  Jeff| 26000|       27300.0|
| 90|  7|Sergey| 30000|       31500.0|
+---+---+------+------+--------------+



In [66]:
emp_json_df.columns

['age', 'id', 'name', 'salary', 'newsal']

### Using SQL View

In [67]:
emp_json_df.createOrReplaceTempView("Emp")
sql_df = spark.sql("select * from Emp")
sql_df.show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 30|  1|  Bill| 10000|10500.0|
| 40|  2| Steve| 12000|12600.0|
| 50|  3|Donald| 14000|14700.0|
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [68]:
spark.sql("select * from emp").show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 30|  1|  Bill| 10000|10500.0|
| 40|  2| Steve| 12000|12600.0|
| 50|  3|Donald| 14000|14700.0|
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [69]:
spark.sql("select * from emp where newsal>=20000").show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [70]:
spark.sql("select salary,salary*1.1 as rev_sal from emp where newsal>=20000").show()

+------+-------+
|salary|rev_sal|
+------+-------+
| 22000|24200.0|
| 26000|28600.0|
| 30000|33000.0|
+------+-------+



In [None]:
#spark.stop()

In [72]:
spark.sql("select * from emp where newsal<20000 and age<50").show()

+---+---+-----+------+-------+
|age| id| name|salary| newsal|
+---+---+-----+------+-------+
| 30|  1| Bill| 10000|10500.0|
| 40|  2|Steve| 12000|12600.0|
+---+---+-----+------+-------+



### Dataframe Operations

In [73]:
goog_df = spark.read.csv(GOOG_CSV)

In [74]:
goog_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [75]:
goog_df = spark.read.csv(GOOG_CSV,inferSchema=True)
goog_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [76]:
goog_df = spark.read.csv(GOOG_CSV,header=True)
goog_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [78]:
goog_df = spark.read.csv(GOOG_CSV,inferSchema=True,header=True,)
goog_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [88]:
goog_df = spark.read.csv(GOOG_CSV,inferSchema=True,header=True,timestampFormat="yyyy-MM-dd")

goog_df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [89]:
goog_df.head(3)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

In [90]:
goog_df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [91]:
goog_df.filter("Close>200").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [93]:
goog_df.count()

1762

In [94]:
goog_df.filter("Close>200").select('open','Volume').show()

+------------------+---------+
|              open|   Volume|
+------------------+---------+
|        213.429998|123432400|
|        214.599998|150476200|
|        214.379993|138040000|
|            211.75|119282800|
|        210.299994|111902700|
|212.79999700000002|115557400|
|209.18999499999998|148614900|
|        207.870005|151473000|
|210.11000299999998|108223500|
|210.92999500000002|148516900|
|        208.330002|182501900|
|        214.910006|153038200|
|        212.079994|152038600|
|202.51000200000001|266424900|
|205.95000100000001|466777500|
|        206.849995|430642100|
|        198.109995|163867200|
|        201.940002|135934400|
|        204.190001|109099200|
|        201.629995|105706300|
+------------------+---------+
only showing top 20 rows



### Complex filtering using Python operators for comparison

Syntax looks very similar to SQL operators, except we need to ensure that we call the entire column within the dataframe using the format: df['col name']

In [101]:
goog_df.filter('Close>200' and 'Open>200').show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [102]:
goog_df.filter('Close>200' and 'Open>200').count()

1101

In [99]:
goog_df.filter(goog_df["Close"] < 200 and goog_df['Open'] > 200).show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [103]:
goog_df.filter((goog_df["Close"] < 200) and (goog_df['Open'] > 200)).show()

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [104]:
goog_df.filter((goog_df["Close"] < 200) & (goog_df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [112]:
goog_df.filter( (goog_df["Close"] < 200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.3

In [114]:
goog_df.filter( (goog_df["Close"] < 200)).count()

661

In [113]:
goog_df.filter((goog_df['Open'] > 200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [115]:
goog_df.filter((goog_df['Open'] > 200)).count()

1101

In [120]:
goog_df.filter((goog_df["Close"] < 200) & (goog_df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [117]:
goog_df.filter("Close < 200" and "Open > 200").count()

1101

In [125]:
#goog_df.createGlobalTempView('goog')
goog_df.createOrReplaceTempView("goog")

In [126]:
spaspark.sql("select * from goog").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [128]:
spark.sql("select * from goog where Close<200 and Open>200").show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



#### Conditional Operators

* | --> or
* & --> and
* ~ --> not (equivalent to ! in Python)

In [129]:
goog_df.filter(goog_df['Low']==197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [131]:
goog_df.filter('Low=197.16').show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [132]:
result = goog_df.filter(goog_df['Low']==197.16).collect()

In [133]:
type(result)

list

In [134]:
type(result[0])

pyspark.sql.types.Row

In [140]:
result[0]

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [141]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [142]:
row = result[0]

In [143]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [144]:
result[0].asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [147]:
for rows in result:
    rowval = rows.asDict()
    for item in rowval:
        print(item)

Date
Open
High
Low
Close
Volume
Adj Close


In [148]:
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [149]:
for item in row:
    print(item)

2010-01-22 00:00:00
206.78000600000001
207.499996
197.16
197.75
220441900
25.620401


In [152]:
spark.sql("select * from goog").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [153]:
spark.sql("select max(Open) from goog").show()

+----------+
| max(Open)|
+----------+
|702.409988|
+----------+



In [154]:
spark.sql("select max(Date), min(Open),sum(High), avg(Low) from goog").show()

+-------------------+---------+-----------------+-----------------+
|          max(Date)|min(Open)|        sum(High)|         avg(Low)|
+-------------------+---------+-----------------+-----------------+
|2016-12-30 00:00:00|     90.0|556635.6894849992|309.8282405079457|
+-------------------+---------+-----------------+-----------------+



In [155]:
spark.sql("select min(Date), min(Open),sum(High), avg(Low) from goog").show()

+-------------------+---------+-----------------+-----------------+
|          min(Date)|min(Open)|        sum(High)|         avg(Low)|
+-------------------+---------+-----------------+-----------------+
|2010-01-04 00:00:00|     90.0|556635.6894849992|309.8282405079457|
+-------------------+---------+-----------------+-----------------+



In [156]:
spark.sql("select count(*) from goog").show()

+--------+
|count(1)|
+--------+
|    1762|
+--------+



In [157]:
spark.sql("select count(*) from goog where Date>'2012-10-10'").show()

+--------+
|count(1)|
+--------+
|    1062|
+--------+



In [158]:
spark.sql("select count(*) from goog where Date>'2012-10-10' and Date <'2014-12-10'").show()

+--------+
|count(1)|
+--------+
|     543|
+--------+



In [161]:
spark.sql("select count(*) from goog where Date.between('2012-10-10' ,'2014-12-10')).show()

SyntaxError: unterminated string literal (detected at line 1) (1987917407.py, line 1)

### GroupBy, Agg

In [162]:
cemp_df = spark.read.csv(EMP_CSV,inferSchema=True,header=True)

In [163]:
cemp_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)
 |-- dept_id: string (nullable = true)



In [164]:
cemp_df.show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  1|  Bill| 30| 10000|     D1|
|  2| Steve| 40| 12000|     D2|
|  3|Donald| 50| 14000|     D2|
|  4|  Modi| 60| 18000|     D1|
|  5|Sundar| 70| 22000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  7|Sergey| 90| 30000|     D3|
+---+------+---+------+-------+



In [165]:
cemp_df.groupBy("dept_id")

<pyspark.sql.group.GroupedData at 0x20ca4767a90>

In [167]:
cemp_df.groupBy("dept_id").count().show()

+-------+-----+
|dept_id|count|
+-------+-----+
|     D1|    2|
|     D3|    3|
|     D2|    2|
+-------+-----+



In [168]:
cemp_df.groupBy("dept_id").mean().show()

+-------+-------+--------+------------------+
|dept_id|avg(id)|avg(age)|       avg(salary)|
+-------+-------+--------+------------------+
|     D1|    2.5|    45.0|           14000.0|
|     D3|    6.0|    80.0|24666.666666666668|
|     D2|    2.5|    45.0|           13000.0|
+-------+-------+--------+------------------+



In [169]:
cemp_df.groupBy("dept_id").avg().show()

+-------+-------+--------+------------------+
|dept_id|avg(id)|avg(age)|       avg(salary)|
+-------+-------+--------+------------------+
|     D1|    2.5|    45.0|           14000.0|
|     D3|    6.0|    80.0|24666.666666666668|
|     D2|    2.5|    45.0|           13000.0|
+-------+-------+--------+------------------+



In [170]:
cemp_df.groupBy("dept_id").sum().show()

+-------+-------+--------+-----------+
|dept_id|sum(id)|sum(age)|sum(salary)|
+-------+-------+--------+-----------+
|     D1|      5|      90|      28000|
|     D3|     18|     240|      74000|
|     D2|      5|      90|      26000|
+-------+-------+--------+-----------+



In [171]:
cemp_df.groupBy("dept_id").max().show()

+-------+-------+--------+-----------+
|dept_id|max(id)|max(age)|max(salary)|
+-------+-------+--------+-----------+
|     D1|      4|      60|      18000|
|     D3|      7|      90|      30000|
|     D2|      3|      50|      14000|
+-------+-------+--------+-----------+



In [172]:
cemp_df.groupBy("dept_id").min().show()

+-------+-------+--------+-----------+
|dept_id|min(id)|min(age)|min(salary)|
+-------+-------+--------+-----------+
|     D1|      1|      30|      10000|
|     D3|      5|      70|      22000|
|     D2|      2|      40|      12000|
+-------+-------+--------+-----------+



In [173]:
cemp_df.createOrReplaceTempView('cemp')

In [175]:
spark.sql('select * from cemp').show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  1|  Bill| 30| 10000|     D1|
|  2| Steve| 40| 12000|     D2|
|  3|Donald| 50| 14000|     D2|
|  4|  Modi| 60| 18000|     D1|
|  5|Sundar| 70| 22000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  7|Sergey| 90| 30000|     D3|
+---+------+---+------+-------+



In [177]:
spark.sql('select dept_id,count(*) from cemp group by dept_id').show()

+-------+--------+
|dept_id|count(1)|
+-------+--------+
|     D1|       2|
|     D3|       3|
|     D2|       2|
+-------+--------+



In [178]:
spark.sql('select dept_id,min(id), max(age),sum(salary),mean(salary) from cemp group by dept_id').show()

+-------+-------+--------+-----------+------------------+
|dept_id|min(id)|max(age)|sum(salary)|      mean(salary)|
+-------+-------+--------+-----------+------------------+
|     D1|      1|      60|      28000|           14000.0|
|     D3|      5|      90|      74000|24666.666666666668|
|     D2|      2|      50|      26000|           13000.0|
+-------+-------+--------+-----------+------------------+



In [179]:
spark.sql('select dept_id,min(id), max(age),sum(salary),mean(salary) from cemp group by dept_id order by dept_id desc').show()

+-------+-------+--------+-----------+------------------+
|dept_id|min(id)|max(age)|sum(salary)|      mean(salary)|
+-------+-------+--------+-----------+------------------+
|     D3|      5|      90|      74000|24666.666666666668|
|     D2|      2|      50|      26000|           13000.0|
|     D1|      1|      60|      28000|           14000.0|
+-------+-------+--------+-----------+------------------+



In [180]:
spark.range(1000 * 1000 * 1000).count()spark.sql('select dept_id,min(id), max(age),sum(salary),mean(salary) from cemp group by dept_id order by dept_id desc').show()

+-------+-------+--------+-----------+------------------+
|dept_id|min(id)|max(age)|sum(salary)|      mean(salary)|
+-------+-------+--------+-----------+------------------+
|     D3|      5|      90|      74000|24666.666666666668|
|     D2|      2|      50|      26000|           13000.0|
|     D1|      1|      60|      28000|           14000.0|
+-------+-------+--------+-----------+------------------+



In [181]:
spark.range(1000 * 1000 * 1000).count()

1000000000

In [201]:
spark.sql('select dept_id,min(id), max(age),sum(salary),mean(salary) as salary from cemp group by dept_id order by dept_id desc, salary asc').show()

+-------+-------+--------+-----------+------------------+
|dept_id|min(id)|max(age)|sum(salary)|            salary|
+-------+-------+--------+-----------+------------------+
|     D3|      5|      90|      74000|24666.666666666668|
|     D2|      2|      50|      26000|           13000.0|
|     D1|      1|      60|      28000|           14000.0|
+-------+-------+--------+-----------+------------------+



In [202]:
spark.sql('select dept_id,min(id), max(age),sum(salary) as ssalary,mean(salary) as salary from cemp group by dept_id order by dept_id desc, salary asc').show()

+-------+-------+--------+-------+------------------+
|dept_id|min(id)|max(age)|ssalary|            salary|
+-------+-------+--------+-------+------------------+
|     D3|      5|      90|  74000|24666.666666666668|
|     D2|      2|      50|  26000|           13000.0|
|     D1|      1|      60|  28000|           14000.0|
+-------+-------+--------+-------+------------------+



In [203]:
spark.sql('select dept_id,min(id), max(age),sum(salary) as ssalary,mean(salary) as msalary from cemp group by dept_id order by dept_id desc, ssalary asc').show()

+-------+-------+--------+-------+------------------+
|dept_id|min(id)|max(age)|ssalary|           msalary|
+-------+-------+--------+-------+------------------+
|     D3|      5|      90|  74000|24666.666666666668|
|     D2|      2|      50|  26000|           13000.0|
|     D1|      1|      60|  28000|           14000.0|
+-------+-------+--------+-------+------------------+



In [188]:
cemp_df.groupBy('dept_id').mean().show()

+-------+-------+--------+------------------+
|dept_id|avg(id)|avg(age)|       avg(salary)|
+-------+-------+--------+------------------+
|     D1|    2.5|    45.0|           14000.0|
|     D3|    6.0|    80.0|24666.666666666668|
|     D2|    2.5|    45.0|           13000.0|
+-------+-------+--------+------------------+



In [189]:
cemp_df.groupBy('dept_id').sum().show()

+-------+-------+--------+-----------+
|dept_id|sum(id)|sum(age)|sum(salary)|
+-------+-------+--------+-----------+
|     D1|      5|      90|      28000|
|     D3|     18|     240|      74000|
|     D2|      5|      90|      26000|
+-------+-------+--------+-----------+



In [190]:
cemp_df.groupBy('dept_id').sum("salary","age").show()

+-------+-----------+--------+
|dept_id|sum(salary)|sum(age)|
+-------+-----------+--------+
|     D1|      28000|      90|
|     D3|      74000|     240|
|     D2|      26000|      90|
+-------+-----------+--------+



### Alias name for aggregated measure columns

In below example, 'alias' method has no impact

In [208]:
cemp_df.groupBy('dept_id').mean("salary").alias('avgSal').show()

+-------+------------------+
|dept_id|       avg(salary)|
+-------+------------------+
|     D1|           14000.0|
|     D3|24666.666666666668|
|     D2|           13000.0|
+-------+------------------+



In [204]:
cemp_df.groupBy("dept_id").mean('salary').select('dept_id',func_col("avg(salary)").alias("avgSal")).show()

+-------+------------------+
|dept_id|            avgSal|
+-------+------------------+
|     D1|           14000.0|
|     D3|24666.666666666668|
|     D2|           13000.0|
+-------+------------------+



In [207]:
cemp_df.groupBy('dept_id').mean('salary').withColumnRenamed('avg(Salary)','avgSal').show()

+-------+------------------+
|dept_id|            avgSal|
+-------+------------------+
|     D1|           14000.0|
|     D3|24666.666666666668|
|     D2|           13000.0|
+-------+------------------+



In [209]:
cemp_df.groupBy("dept_id").sum("salary","age").mean('salary').show()

AttributeError: 'DataFrame' object has no attribute 'mean'

In [210]:
cemp_df.groupBy('dept_id').count().show()

+-------+-----+
|dept_id|count|
+-------+-----+
|     D1|    2|
|     D3|    3|
|     D2|    2|
+-------+-----+



In [211]:
cemp_df.groupBy("dept_id").max().show()

+-------+-------+--------+-----------+
|dept_id|max(id)|max(age)|max(salary)|
+-------+-------+--------+-----------+
|     D1|      4|      60|      18000|
|     D3|      7|      90|      30000|
|     D2|      3|      50|      14000|
+-------+-------+--------+-----------+



In [212]:
cemp_df.groupBy("dept_id").min().show()

+-------+-------+--------+-----------+
|dept_id|min(id)|min(age)|min(salary)|
+-------+-------+--------+-----------+
|     D1|      1|      30|      10000|
|     D3|      5|      70|      22000|
|     D2|      2|      40|      12000|
+-------+-------+--------+-----------+



In [213]:
cemp_df.groupBy("dept_id").sum().show()

+-------+-------+--------+-----------+
|dept_id|sum(id)|sum(age)|sum(salary)|
+-------+-------+--------+-----------+
|     D1|      5|      90|      28000|
|     D3|     18|     240|      74000|
|     D2|      5|      90|      26000|
+-------+-------+--------+-----------+



In [214]:
cemp_df.agg({'salary':'max'})

DataFrame[max(salary): int]

In [215]:
cemp_df.agg({'salary':'max'}).show()

+-----------+
|max(salary)|
+-----------+
|      30000|
+-----------+



In [216]:
groupd_cemp = cemp_df.groupBy("dept_id")

In [219]:
groupd_cemp.agg({'salary':'sum'}).show()

+-------+-----------+
|dept_id|sum(salary)|
+-------+-----------+
|     D1|      28000|
|     D3|      74000|
|     D2|      26000|
+-------+-----------+



In [220]:
grouped_cemp2 = cemp_df.groupBy(['dept_id','salary'])

In [221]:
grouped_cemp2.agg({'age':'max'}).show()

+-------+------+--------+
|dept_id|salary|max(age)|
+-------+------+--------+
|     D1| 18000|      60|
|     D2| 14000|      50|
|     D3| 30000|      90|
|     D3| 22000|      80|
|     D2| 12000|      40|
|     D1| 10000|      30|
+-------+------+--------+



In [222]:
grouped_cemp2.agg({'age':'max', 'id':'sum'})

DataFrame[dept_id: string, salary: int, sum(id): bigint, max(age): int]

In [223]:
grouped_cemp2.agg({'age':'max', 'id':'sum'}).show()

+-------+------+-------+--------+
|dept_id|salary|sum(id)|max(age)|
+-------+------+-------+--------+
|     D1| 18000|      4|      60|
|     D2| 14000|      3|      50|
|     D3| 30000|      7|      90|
|     D3| 22000|     11|      80|
|     D2| 12000|      2|      40|
|     D1| 10000|      1|      30|
+-------+------+-------+--------+



Multiple aggregations using **agg** along with **alias** on aggregate measures

In [225]:
from pyspark.sql.functions import max as smax, min as smin, count, countDistinct

In [226]:
grouped_cemp2.agg(smax("age").alias("maxAge"),
                  smin("age").alias('minAe'),
                  countDistinct("age").alias('distinctAge')).show()

+-------+------+------+-----+-----------+
|dept_id|salary|maxAge|minAe|distinctAge|
+-------+------+------+-----+-----------+
|     D1| 18000|    60|   60|          1|
|     D2| 14000|    50|   50|          1|
|     D3| 30000|    90|   90|          1|
|     D3| 22000|    80|   70|          2|
|     D2| 12000|    40|   40|          1|
|     D1| 10000|    30|   30|          1|
+-------+------+------+-----+-----------+



### Apply aggregation functions directly on grouped-data <font color='red'>without</font> using "agg"

In [229]:
grouped_cemp2.max('age','id').show()

+-------+------+--------+-------+
|dept_id|salary|max(age)|max(id)|
+-------+------+--------+-------+
|     D1| 18000|      60|      4|
|     D2| 14000|      50|      3|
|     D3| 30000|      90|      7|
|     D3| 22000|      80|      6|
|     D2| 12000|      40|      2|
|     D1| 10000|      30|      1|
+-------+------+--------+-------+



In [231]:
cemp_df.show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  1|  Bill| 30| 10000|     D1|
|  2| Steve| 40| 12000|     D2|
|  3|Donald| 50| 14000|     D2|
|  4|  Modi| 60| 18000|     D1|
|  5|Sundar| 70| 22000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  7|Sergey| 90| 30000|     D3|
+---+------+---+------+-------+



In [232]:
grouped_cemp2.min('age','id').show()

+-------+------+--------+-------+
|dept_id|salary|min(age)|min(id)|
+-------+------+--------+-------+
|     D1| 18000|      60|      4|
|     D2| 14000|      50|      3|
|     D3| 30000|      90|      7|
|     D3| 22000|      70|      5|
|     D2| 12000|      40|      2|
|     D1| 10000|      30|      1|
+-------+------+--------+-------+



In [233]:
grouped_cemp2.max('age','id').withColumnRenamed('max(age)','max_age').withColumnRenamed('min(id)','min_id').show()

+-------+------+-------+-------+
|dept_id|salary|max_age|max(id)|
+-------+------+-------+-------+
|     D1| 18000|     60|      4|
|     D2| 14000|     50|      3|
|     D3| 30000|     90|      7|
|     D3| 22000|     80|      6|
|     D2| 12000|     40|      2|
|     D1| 10000|     30|      1|
+-------+------+-------+-------+



In [237]:
grouped_cemp2.max('age','id').withColumnRenamed('max(age)','max_age').show()

+-------+------+-------+-------+
|dept_id|salary|max_age|max(id)|
+-------+------+-------+-------+
|     D1| 18000|     60|      4|
|     D2| 14000|     50|      3|
|     D3| 30000|     90|      7|
|     D3| 22000|     80|      6|
|     D2| 12000|     40|      2|
|     D1| 10000|     30|      1|
+-------+------+-------+-------+



### Spark SQL Functions
Variety of functions that we can import from pyspark.sql.functions. For details refer:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [238]:
goog_df.select(dayofmonth(goog_df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [239]:
goog_df.select(dayofmonth('Date')).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [240]:
goog_df.select("Date").show()

+-------------------+
|               Date|
+-------------------+
|2010-01-04 00:00:00|
|2010-01-05 00:00:00|
|2010-01-06 00:00:00|
|2010-01-07 00:00:00|
|2010-01-08 00:00:00|
|2010-01-11 00:00:00|
|2010-01-12 00:00:00|
|2010-01-13 00:00:00|
|2010-01-14 00:00:00|
|2010-01-15 00:00:00|
|2010-01-19 00:00:00|
|2010-01-20 00:00:00|
|2010-01-21 00:00:00|
|2010-01-22 00:00:00|
|2010-01-25 00:00:00|
|2010-01-26 00:00:00|
|2010-01-27 00:00:00|
|2010-01-28 00:00:00|
|2010-01-29 00:00:00|
|2010-02-01 00:00:00|
+-------------------+
only showing top 20 rows



In [241]:
goog_df.select(dayofyear('Date')).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



In [243]:
goog_df.select(year('Date')).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [244]:
goog_df.select(month('Date')).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [246]:
goog_df.select(hour('Date')).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [249]:
cemp_df.select(countDistinct("salary")).show()

+----------------------+
|count(DISTINCT salary)|
+----------------------+
|                     6|
+----------------------+



In [248]:
cemp_df.select(distinct("salary")).show()

NameError: name 'distinct' is not defined

In [255]:
spark.sql("select distinct salary from cemp").show()

+------+
|salary|
+------+
| 12000|
| 30000|
| 10000|
| 22000|
| 18000|
| 14000|
+------+



In [257]:
cemp_df.select(countDistinct("salary").alias("distinct salaries")).show()

+-----------------+
|distinct salaries|
+-----------------+
|                6|
+-----------------+



In [258]:
cemp_df.select(avg('salary')).show()

+------------------+
|       avg(salary)|
+------------------+
|18285.714285714286|
+------------------+



In [259]:
from pyspark.sql.functions import stddev
cemp_df.select(stddev("salary")).show()

+-------------------+
|stddev_samp(salary)|
+-------------------+
|  6969.320524371697|
+-------------------+



### Order By

In [261]:
cemp_df.orderBy("salary").show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  1|  Bill| 30| 10000|     D1|
|  2| Steve| 40| 12000|     D2|
|  3|Donald| 50| 14000|     D2|
|  4|  Modi| 60| 18000|     D1|
|  5|Sundar| 70| 22000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  7|Sergey| 90| 30000|     D3|
+---+------+---+------+-------+



In [263]:
cemp_df.orderBy(cemp_df["salary"].desc()).show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  7|Sergey| 90| 30000|     D3|
|  5|Sundar| 70| 22000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  4|  Modi| 60| 18000|     D1|
|  3|Donald| 50| 14000|     D2|
|  2| Steve| 40| 12000|     D2|
|  1|  Bill| 30| 10000|     D1|
+---+------+---+------+-------+



In [265]:
cemp_df.orderBy(cemp_df.salary.desc(), cemp_df.name.asc()).show()

+---+------+---+------+-------+
| id|  name|age|salary|dept_id|
+---+------+---+------+-------+
|  7|Sergey| 90| 30000|     D3|
|  6|  Jeff| 80| 22000|     D3|
|  5|Sundar| 70| 22000|     D3|
|  4|  Modi| 60| 18000|     D1|
|  3|Donald| 50| 14000|     D2|
|  2| Steve| 40| 12000|     D2|
|  1|  Bill| 30| 10000|     D1|
+---+------+---+------+-------+



### Saving DF as Parquet

In [267]:
emp_json_df.write.mode('overwrite').parquet(get_hdfs_filepath('emp.parquet'))

In [268]:
emp_json_df.show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 30|  1|  Bill| 10000|10500.0|
| 40|  2| Steve| 12000|12600.0|
| 50|  3|Donald| 14000|14700.0|
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [270]:
pemp_df = spark.read.parquet(get_hdfs_filepath('emp.parquet'))
pemp_df.head(3)

[Row(age=30, id=1, name='Bill', salary=10000, newsal=10500.0),
 Row(age=40, id=2, name='Steve', salary=12000, newsal=12600.0),
 Row(age=50, id=3, name='Donald', salary=14000, newsal=14700.0)]

In [271]:
from pyspark.sql import SQLContext
sc = spark.sparkContext
sql_ctx = SQLContext(sc)



In [273]:
sql_ctx.registerFunction('ucase',lambda val: val.upper())



<function __main__.<lambda>(val)>

In [275]:
spark.sql("select * from emp where salary>=18000").show()

+---+---+------+------+-------+
|age| id|  name|salary| newsal|
+---+---+------+------+-------+
| 60|  4|  Modi| 18000|18900.0|
| 70|  5|Sunder| 22000|23100.0|
| 80|  6|  Jeff| 26000|27300.0|
| 90|  7|Sergey| 30000|31500.0|
+---+---+------+------+-------+



In [279]:
spark.sql('select ucase(name) uname, age from emp where salary>=18000').show()

+------+---+
| uname|age|
+------+---+
|  MODI| 60|
|SUNDER| 70|
|  JEFF| 80|
|SERGEY| 90|
+------+---+

