In [37]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()
df = spark.read.csv('stock.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [38]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

## Filter DataFrames

### Filter using SQL notation

In [39]:
df.filter("CLOSE < 500").select(['OPEN','CLOSE']).show()

+------------------+------------------+
|              OPEN|             CLOSE|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



### Filter using Operations on a Column

In [40]:
df.filter(df['Close'] > 700).select(['OPEN','CLOSE']).show()

+----------+----------+
|      OPEN|     CLOSE|
+----------+----------+
|699.879997|701.910004|
|700.259979|702.100021|
|702.409988|700.089989|
+----------+----------+



### Filter using Operations on Multiple Columns

Be careful to wrap each operation in brackets

And Operator: `&`
Or Operator: `|`
Not Operator: `~`

In [41]:
df.filter( (df['Close'] < 200) & (df['Open'] > 200) ).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



In [42]:
# Open NOT > 200
df.filter( (df['Close'] < 200) & ~(df['Open'] > 200) ).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|
|2010-02-08|        195.690006|197.88000300000002|      

### Collect Result as Rows[]

In [43]:
result = df.filter(df['Low'] == 197.16).collect()

### Convert Row to Dictionary

In [44]:
row_dict = result[0].asDict()
row_dict['Volume']

220441900

## Grouping

In [48]:
spark = SparkSession.builder.appName("GroupsAndAggs").getOrCreate()
sales = spark.read.csv('sales.csv', inferSchema=True, header=True)
sales.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [51]:
# Group by Company
type(sales.groupby("Company"))

pyspark.sql.group.GroupedData

In [53]:
group_by_co = sales.groupby("Company")

## Aggregating