# Basic Operations

This lecture will cover some basic operations with Spark DataFrames.

We will play around with some stock data from Apple.

In [1]:
from pyspark.sql import SparkSession

Create SparkSession object is the first entrypoint to start your Spark code. For each time running Spark job, the SparkDriver base on SparkSession to divide task, assign task to worker and collect result from worker

In [3]:
# May take awhile locally
spark = SparkSession.builder.appName("DemoOperations").getOrCreate()

24/08/15 10:11:16 WARN Utils: Your hostname, quangnx7-Inspiron-16-Plus-7630 resolves to a loopback address: 127.0.1.1; using 192.168.1.13 instead (on interface wlp0s20f3)
24/08/15 10:11:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/15 10:11:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read CSV

In [6]:
# Let Spark know about the header and infer the Schema types!
df = spark.read.csv("appl_stock.csv", inferSchema=True, header=True)

Print schema

When you read csv with `inferSchem=True`, you can view the schema automatically assign to the Spark data frame

In [8]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



## Filtering Data

Filter Close > 500

Remember Spark is lazy, so it does not run until you call an action(). The code below, show() is an action

In [11]:
df.filter("Close>500").show()

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

Filter Close and select

In [12]:
df.filter("Close>500").select("Open").show()

+------------------+
|              Open|
+------------------+
|        499.529991|
|        504.659988|
|        491.500008|
|        503.109993|
|506.88001299999996|
|        513.079994|
|        515.079987|
| 519.6699980000001|
|        521.309982|
|        527.960014|
| 541.5600049999999|
|        548.169983|
|        544.240013|
|        545.420013|
|        523.659996|
| 536.8000030000001|
| 534.6899950000001|
|        544.209999|
| 548.9799879999999|
|        557.540024|
+------------------+
only showing top 20 rows



In [13]:
df.filter("Close>500").select(["Open", "Close"]).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        499.529991|502.60002099999997|
|        504.659988|        509.459991|
|        491.500008|502.20999900000004|
|        503.109993|         502.12001|
|506.88001299999996|        514.850021|
|        513.079994|        513.039993|
|        515.079987| 516.3899769999999|
| 519.6699980000001| 522.4099809999999|
|        521.309982|        525.760017|
|        527.960014|        535.410011|
| 541.5600049999999|        542.440025|
|        548.169983| 544.4699780000001|
|        544.240013|        545.180008|
|        545.420013| 533.1600269999999|
|        523.659996|        530.259987|
| 536.8000030000001| 530.6900099999999|
| 534.6899950000001|        541.989975|
|        544.209999|        545.170021|
| 548.9799879999999|        551.999977|
|        557.540024|        568.099998|
+------------------+------------------+
only showing top 20 rows



Using normal python comparison operators is another way to do this, they will look very similar to SQL operators, except you need to make sure you are calling the entire column within the dataframe, using the format: df["column name"]

Let's see some examples:

Using


& : and

| : or

~ : not

In [20]:
df.filter((df["Open"] > 100) & (df["Close"] < 200)).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-22|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|
|2010-01-28|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|      

In [22]:
df.filter((df["Open"] < 100) | ~(df["Close"] < 200)).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [24]:
df.filter(df["Close"] == 210.58).show()

+----------+------+----------+----------+------+---------+---------+
|      Date|  Open|      High|       Low| Close|   Volume|Adj Close|
+----------+------+----------+----------+------+---------+---------+
|2010-01-07|211.75|212.000006|209.050005|210.58|119282800| 27.28265|
+----------+------+----------+----------+------+---------+---------+



collect() is action to return Spark DataFrame as a list of Spark rows. You can use this list of Spark rows to expose data to another third paries.

In [25]:
result = df.filter(df["Close"] == 210.58).collect()

In [26]:
result

[Row(Date=datetime.date(2010, 1, 7), Open=211.75, High=212.000006, Low=209.050005, Close=210.58, Volume=119282800, Adj Close=27.28265)]

In [28]:
type(result[0])

pyspark.sql.types.Row

In [30]:
result[0].asDict()

{'Date': datetime.date(2010, 1, 7),
 'Open': 211.75,
 'High': 212.000006,
 'Low': 209.050005,
 'Close': 210.58,
 'Volume': 119282800,
 'Adj Close': 27.28265}

In [31]:
for item in result[0]:
    print(item)

2010-01-07
211.75
212.000006
209.050005
210.58
119282800
27.28265


# Enrich schema

In Spark we can assign schema to data frame when read data to some specific semi-structure data

In [32]:
df1 = spark.read.json("people.json")

In [33]:
df1.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [34]:
df1.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



You have to

- Define StructField with corresponding data type, e.g: StringType, IntegerType
- Group all StructField into list, then add this list to StructType
- Add StrucType to the SparkReader with parameter `schema`

In [35]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [36]:
data_schema = [StructField("age", IntegerType(), True), StructField("name", StringType(), True)]

In [38]:
final_type = StructType(fields=data_schema)

In [39]:
df2 = spark.read.json("/home/quangnx7/Workspace/Study/Fresher_DE/Spark_DataFrames/people.json", schema=final_type)

In [40]:
df2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



# Spark Aggregate

You can aggregate data by using `groupBy()`

In [5]:
df3 = spark.read.csv("/home/quangnx7/Workspace/Study/Fresher_DE/Spark_DataFrames/sales_info.csv", inferSchema=True, header=True)

In [6]:
df3.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [43]:
df3.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [44]:
df3.groupBy("Company")

<pyspark.sql.group.GroupedData at 0x727f8d334c70>

In [45]:
df3.groupBy("Company").mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [48]:
from pyspark.sql.functions import format_number
from pyspark.sql.functions import count_distinct, stddev

In [52]:
df3_std = df3.select(stddev("sales").alias("std_sales"))
df3_std = df3_std.select(format_number("std_sales", 2)).show()

+---------------------------+
|format_number(std_sales, 2)|
+---------------------------+
|                     250.09|
+---------------------------+



# SparkSQL

You can transform data by the form of SQL syntax by using SparkSQL

In [7]:
df3.createOrReplaceTempView("sales_info")

In [12]:
sql_stmt = """select Company, Person, Sales as Max_sales
from (
    select Company, Person, Sales,
        dense_rank() over(partition by Company order by Sales desc) as rank_sales
    from sales_info
)
where rank_sales = 1
"""

sql_result = spark.sql(sql_stmt)

In [13]:
sql_result

DataFrame[Company: string, Person: string, Max_sales: double]

In [14]:
sql_result.show()

+-------+------+---------+
|Company|Person|Max_sales|
+-------+------+---------+
|   APPL|  Mike|    750.0|
|     FB|  Carl|    870.0|
|   GOOG| Frank|    340.0|
|   MSFT|  Tina|    600.0|
+-------+------+---------+

