# Spark DataFrames

#### Start a simple Spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('dataframes').getOrCreate()

#### read csv file

In [2]:
df = spark.read.csv('walmart_stock.csv', inferSchema=True, header=True)

In [3]:
df.show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



#### define schema

In [4]:
# from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType

# data_schema = [StructField("age", IntegerType(), True), # True means accept null data
#                StructField("name", StringType(), True)]

# final_struc = StructType(fields=data_schema)

# df = spark.read.csv('walmart_stock.csv',header=True, )

In [5]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [6]:
df.count() # number of rows

1258

#### What does the Schema look like?

In [7]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [8]:
df.head(2) # show two first lines as 

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)]

In [9]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### spark create row object and column object

In [10]:
type(df["Open"])

pyspark.sql.column.Column

In [11]:
type(df.head())

pyspark.sql.types.Row

#### select a column or create a new column

In [12]:
df.select(["Open"]).show(3)

+------------------+
|              Open|
+------------------+
|         59.970001|
|60.209998999999996|
|         59.349998|
+------------------+
only showing top 3 rows



In [13]:
df.withColumn("new", df["Open"]*2).show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|               new|
+-------------------+------------------+---------+---------+------------------+--------+------------------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|        119.940002|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|120.41999799999999|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|        118.699996|
+-------------------+------------------+---------+---------+------------------+--------+------------------+------------------+
only showing top 3 rows



In [14]:
df.withColumnRenamed("Open", "new_Open").show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|          new_Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 3 rows



#### filter data

In [26]:
# write a single SQL string
df.filter("Open > 60 and Open < 80").show(2)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-20 00:00:00|             60.75|    61.25|60.669998|61.009997999999996|10378800|53.212320999999996|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 2 rows



In [31]:
# To pass multiple conditions to filter or where use Column objects and logical operators (&, |, ~)
from pyspark.sql.functions import col
df.filter((col("Open") > 60) & (col("Open") < 100)).show(2)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-20 00:00:00|             60.75|    61.25|60.669998|61.009997999999996|10378800|53.212320999999996|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 2 rows



In [32]:
df.filter((df["Open"] > 60) & (df["Open"] < 100)).show(2)

+-------------------+------------------+---------+---------+------------------+--------+------------------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-20 00:00:00|             60.75|    61.25|60.669998|61.009997999999996|10378800|53.212320999999996|
+-------------------+------------------+---------+---------+------------------+--------+------------------+
only showing top 2 rows



In [37]:
df.filter((df["Open"] > 60) & (df["Open"] < 100)).select(["Date", "Open"]).show(2)

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2012-01-04 00:00:00|60.209998999999996|
|2012-01-20 00:00:00|             60.75|
+-------------------+------------------+
only showing top 2 rows



#### get the data from a specific line

In [33]:
result = df.filter(df["Open"] == 60.75).collect()
print(type(result))
result

<class 'list'>


[Row(Date=datetime.datetime(2012, 1, 20, 0, 0), Open=60.75, High=61.25, Low=60.669998, Close=61.009997999999996, Volume=10378800, Adj Close=53.212320999999996),
 Row(Date=datetime.datetime(2012, 1, 24, 0, 0), Open=60.75, High=62.0, Low=60.75, Close=61.389998999999996, Volume=7362800, Adj Close=53.54375400000001)]

In [34]:
row = result[0]
row.asDict()

{'Date': datetime.datetime(2012, 1, 20, 0, 0),
 'Open': 60.75,
 'High': 61.25,
 'Low': 60.669998,
 'Close': 61.009997999999996,
 'Volume': 10378800,
 'Adj Close': 53.212320999999996}

#### convert column type

In [73]:
from pyspark.sql.types import IntegerType, StringType, FloatType
df.withColumn("Open_int", df["Open"].cast(IntegerType())).show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|Open_int|
+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|      59|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|      60|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|      59|
+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
only showing top 3 rows



In [76]:
from pyspark.sql.functions import udf

def float_to_int(x):
    return int(x)

udffloat_to_int = udf(float_to_int, IntegerType())

df.withColumn("Open_int", udffloat_to_int("Open")).show(3)

+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|Open_int|
+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|      59|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|      60|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|      59|
+-------------------+------------------+---------+---------+------------------+--------+------------------+--------+
only showing top 3 rows



#### add new column using function or new data

In [44]:
from pyspark.sql.functions import lit
df.withColumn("x4", lit(0)).show(2)

+-------------------+------------------+---------+---------+------------------+--------+------------------+---+
|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close| x4|
+-------------------+------------------+---------+---------+------------------+--------+------------------+---+
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|  0|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|  0|
+-------------------+------------------+---------+---------+------------------+--------+------------------+---+
only showing top 2 rows



In [60]:
def square(x):
    return x*x

In [65]:
square_udf_int = udf(lambda z: square(z), IntegerType())
square_udf_float = udf(lambda z: square(z), FloatType())

In [78]:
df.select("Open", square_udf_int(df["Open"].cast(IntegerType())).alias("SQR_int"), square_udf_float("Open").alias("SQR_float")).show(3)

+------------------+-------+---------+
|              Open|SQR_int|SQR_float|
+------------------+-------+---------+
|         59.970001|   3481|3596.4011|
|60.209998999999996|   3600| 3625.244|
|         59.349998|   3481|3522.4224|
+------------------+-------+---------+
only showing top 3 rows



In [84]:
def group(x):
    if x < 50 : return "A"
    elif x < 60 : return "B"
    elif x < 70 : return "C"
    elif x < 80 : return "D"
    else : return "E"
    
udf_group = udf(lambda z: group(z), StringType())

In [93]:
da = df.select("Open", "Close", udf_group("Open").alias("Group"))

#### group by

In [94]:
da.groupby("Group").count().show()

+-----+-----+
|Group|count|
+-----+-----+
|    E|  112|
|    B|   82|
|    D|  748|
|    C|  316|
+-----+-----+



In [95]:
da.groupby("Group").mean().show()

+-----+------------------+------------------+
|Group|         avg(Open)|        avg(Close)|
+-----+------------------+------------------+
|    E| 84.31178542857143|  84.3377680982143|
|    B|58.917926902439035|58.973170731707306|
|    D|  74.8132084759358| 74.82470592379677|
|    C| 65.79655097468358| 65.86759499050635|
+-----+------------------+------------------+



In [99]:
da.groupby("Group").agg({"Open":"max", "Close":"min"}).show()

+-----+----------+---------+
|Group|min(Close)|max(Open)|
+-----+----------+---------+
|    E| 79.739998|90.800003|
|    B| 56.419998|59.970001|
|    D| 68.940002|79.839996|
|    C|     58.98|69.980003|
+-----+----------+---------+



In [100]:
from pyspark.sql.functions import max, min, mean

da.groupby("Group").agg(max(da["Open"]).alias("max"), 
                        min(da["Close"]).alias("min")).show()

+-----+---------+---------+
|Group|      max|      min|
+-----+---------+---------+
|    E|90.800003|79.739998|
|    B|59.970001|56.419998|
|    D|79.839996|68.940002|
|    C|69.980003|    58.98|
+-----+---------+---------+



In [102]:
from functools import reduce

columns_and_operations = {"Open":"max", "Close":"min"}

dg = da.groupby("Group").agg(columns_and_operations)

old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())

reduce(lambda dg, i: dg.withColumnRenamed(old_names[i], new_names[i]), range(len(old_names)), dg).show()

+-----+---------+---------+
|Group|    Close|     Open|
+-----+---------+---------+
|    E|79.739998|90.800003|
|    B|56.419998|59.970001|
|    D|68.940002|79.839996|
|    C|    58.98|69.980003|
+-----+---------+---------+



#### get max, min, or any statistic from a column

In [103]:
da.agg(max(df.Open)).collect()[0]

Row(max(Open)=90.800003)

In [108]:
list(da.agg(max(df.Open)).collect()[0].asDict().values())[0]

90.800003

In [110]:
from pyspark.sql.functions import countDistinct, avg, stddev
da.select(avg('Open').alias('Avg')).show()

+-----------------+
|              Avg|
+-----------------+
|72.35785375357709|
+-----------------+



In [113]:
list(da.select(avg('Open').alias('Avg')).collect()[0].asDict().values())[0]

72.35785375357709

In [116]:
from pyspark.sql.functions import format_number 
list(da.select(avg('Open').alias('Avg')).select(format_number('Avg', 3)).collect()[0].asDict().values())[0]

'72.358'

#### sort a column

In [118]:
df.orderBy('Open').show(5)

+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|               Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+--------+------------------+
|2015-11-16 00:00:00|56.389998999999996|         58.029999|56.360001000000004|         57.869999|13321600|55.362759999999994|
|2015-11-13 00:00:00|56.740002000000004|         57.060001|         56.299999|         56.419998|12514900|53.975581000000005|
|2015-11-02 00:00:00|57.290001000000004|57.610001000000004|             56.77|57.610001000000004|10719200|         55.114026|
|2015-11-03 00:00:00|             57.57|         58.330002|         57.529999|58.110001000000004|10253900|         55.592364|
|2012-04-26 00:00:00|             57.59|             59.43|             57.57|         58.950001|25092900|         51.

In [119]:
df.orderBy(df['Open'].desc()).show(5)

+-------------------+-----------------+-----------------+---------+---------+--------+-----------------+
|               Date|             Open|             High|      Low|    Close|  Volume|        Adj Close|
+-------------------+-----------------+-----------------+---------+---------+--------+-----------------+
|2015-01-13 00:00:00|        90.800003|        90.970001|    88.93|89.309998| 8215400|        83.825448|
|2015-01-09 00:00:00|            90.32|        90.389999|    89.25|89.349998| 8522500|        83.862993|
|2015-01-12 00:00:00|        89.360001|        90.309998|89.220001|90.019997| 7372500|        84.491846|
|2015-01-08 00:00:00|        89.209999|90.66999799999999|    89.07|90.470001|12713600|84.91421600000001|
|2015-01-23 00:00:00|88.41999799999999|        89.260002|87.889999|88.510002| 7565800|83.07458100000001|
+-------------------+-----------------+-----------------+---------+---------+--------+-----------------+
only showing top 5 rows



#### Join

In [120]:
valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
TableA = spark.createDataFrame(valuesA,['name','id'])
 
valuesB = [('Rutabaga',1),('Pirate',2),('Ninja',3),('Darth Vader',4)]
TableB = spark.createDataFrame(valuesB,['name','id'])
 
TableA.show()
TableB.show()

+---------+---+
|     name| id|
+---------+---+
|   Pirate|  1|
|   Monkey|  2|
|    Ninja|  3|
|Spaghetti|  4|
+---------+---+

+-----------+---+
|       name| id|
+-----------+---+
|   Rutabaga|  1|
|     Pirate|  2|
|      Ninja|  3|
|Darth Vader|  4|
+-----------+---+



In [121]:
ta = TableA.alias('ta')
tb = TableB.alias('tb')

In [122]:
# inner join
ta.join(tb, ta.name == tb.name).show()

+------+---+------+---+
|  name| id|  name| id|
+------+---+------+---+
| Ninja|  3| Ninja|  3|
|Pirate|  1|Pirate|  2|
+------+---+------+---+



In [124]:
ta.join(tb, ta.name == tb.name, how="left").show()

+---------+---+------+----+
|     name| id|  name|  id|
+---------+---+------+----+
|Spaghetti|  4|  null|null|
|    Ninja|  3| Ninja|   3|
|   Pirate|  1|Pirate|   2|
|   Monkey|  2|  null|null|
+---------+---+------+----+



In [127]:
ta.join(tb, ta.name == tb.name, how="left").select(ta.name, ta.id, tb.id.alias("new")).show()

+---------+---+----+
|     name| id| new|
+---------+---+----+
|Spaghetti|  4|null|
|    Ninja|  3|   3|
|   Pirate|  1|   2|
|   Monkey|  2|null|
+---------+---+----+



In [132]:
dn = ta.join(tb, ta.name == tb.name, how='full').select(ta.name, ta.id, tb.name.alias("name2"), tb.id.alias("id2"))
dn.show()

+---------+----+-----------+----+
|     name|  id|      name2| id2|
+---------+----+-----------+----+
|     null|null|   Rutabaga|   1|
|Spaghetti|   4|       null|null|
|    Ninja|   3|      Ninja|   3|
|   Pirate|   1|     Pirate|   2|
|   Monkey|   2|       null|null|
|     null|null|Darth Vader|   4|
+---------+----+-----------+----+



#### work with null

In [134]:
dn.na.drop().show()

+------+---+------+---+
|  name| id| name2|id2|
+------+---+------+---+
| Ninja|  3| Ninja|  3|
|Pirate|  1|Pirate|  2|
+------+---+------+---+



In [135]:
dn.na.drop(thresh=2).show()

+---------+----+-----------+----+
|     name|  id|      name2| id2|
+---------+----+-----------+----+
|     null|null|   Rutabaga|   1|
|Spaghetti|   4|       null|null|
|    Ninja|   3|      Ninja|   3|
|   Pirate|   1|     Pirate|   2|
|   Monkey|   2|       null|null|
|     null|null|Darth Vader|   4|
+---------+----+-----------+----+



In [137]:
dn.na.drop(how='any').show() # how='all'

+---------+----+-----------+----+
|     name|  id|      name2| id2|
+---------+----+-----------+----+
|     null|null|   Rutabaga|   1|
|Spaghetti|   4|       null|null|
|    Ninja|   3|      Ninja|   3|
|   Pirate|   1|     Pirate|   2|
|   Monkey|   2|       null|null|
|     null|null|Darth Vader|   4|
+---------+----+-----------+----+



In [141]:
dn.na.drop(subset=["name2", "id2"]).show()

+------+----+-----------+---+
|  name|  id|      name2|id2|
+------+----+-----------+---+
|  null|null|   Rutabaga|  1|
| Ninja|   3|      Ninja|  3|
|Pirate|   1|     Pirate|  2|
|  null|null|Darth Vader|  4|
+------+----+-----------+---+



In [145]:
mean_value = dn.select(mean(dn["id"])).collect()[0][0]
dn.na.fill(subset="id", value=mean_value).show()

+---------+---+-----------+----+
|     name| id|      name2| id2|
+---------+---+-----------+----+
|     null|  2|   Rutabaga|   1|
|Spaghetti|  4|       null|null|
|    Ninja|  3|      Ninja|   3|
|   Pirate|  1|     Pirate|   2|
|   Monkey|  2|       null|null|
|     null|  2|Darth Vader|   4|
+---------+---+-----------+----+



In [159]:
from pyspark.sql.functions import isnan, when, count, col

dn.select([count(when(col(c).isNull(), c)).alias(c) for c in dn.columns]).show()

+----+---+-----+---+
|name| id|name2|id2|
+----+---+-----+---+
|   2|  2|    2|  2|
+----+---+-----+---+



In [151]:
dn.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dn.columns]).show()

+----+---+-----+---+
|name| id|name2|id2|
+----+---+-----+---+
|   2|  2|    2|  2|
+----+---+-----+---+



In [152]:
[count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dn.columns]

[Column<b'count(CASE WHEN (isnan(name) OR (name IS NULL)) THEN name END) AS `name`'>,
 Column<b'count(CASE WHEN (isnan(id) OR (id IS NULL)) THEN id END) AS `id`'>,
 Column<b'count(CASE WHEN (isnan(name2) OR (name2 IS NULL)) THEN name2 END) AS `name2`'>,
 Column<b'count(CASE WHEN (isnan(id2) OR (id2 IS NULL)) THEN id2 END) AS `id2`'>]

In [156]:
dn.select(dn["name"].isNull()).show()

+--------------+
|(name IS NULL)|
+--------------+
|          true|
|         false|
|         false|
|         false|
|         false|
|          true|
+--------------+



#### median and qurtile

In [165]:
from pyspark.sql.functions import array

In [171]:
#Name of the column
#List of values signifying which quantile we want. In our example we are calculating 25%, 50% and 75%. 50% is same as median
#The last parameter signifies the error rate. 0.0 signifies we want exact value.
df.stat.approxQuantile("Open", [0.25, 0.5, 0.75],0.0)

[68.620003, 73.230003, 76.629997]

#### concat rows

In [176]:
df1 = df.filter(df["Open"] > 70)

In [178]:
print(df.count(), df1.count())

1258 858


In [179]:
df2 = df.union(df1)

In [180]:
df2.count()

2116

#### drop dublicate

In [184]:
df2.drop_duplicates(subset=["Open"]).count()

957

In [185]:
df2.drop_duplicates().count()

1258

#### time

In [186]:
from pyspark.sql.functions import dayofmonth, dayofyear, hour, month, year

In [172]:
dp = df.toPandas()

In [173]:
dp.describe()

Unnamed: 0,Open,High,Low,Close,Volume,Adj Close
count,1258.0,1258.0,1258.0,1258.0,1258.0,1258.0
mean,72.357854,72.839388,71.918601,72.38845,8222093.0,67.238838
std,6.76809,6.768187,6.744076,6.756859,4519781.0,6.722609
min,56.389999,57.060001,56.299999,56.419998,2094900.0,50.363689
25%,68.627503,69.059998,68.162502,68.632497,5791100.0,63.778335
50%,73.235,73.725002,72.839996,73.265,7093500.0,68.541162
75%,76.629997,77.094999,76.25,76.709999,9394675.0,71.105668
max,90.800003,90.970001,89.25,90.470001,80898100.0,84.914216


In [188]:
df.select(min(df["Open"])).show()

+------------------+
|         min(Open)|
+------------------+
|56.389998999999996|
+------------------+



In [189]:
df.cache()

DataFrame[Date: timestamp, Open: double, High: double, Low: double, Close: double, Volume: int, Adj Close: double]

In [191]:
df.storageLevel

StorageLevel(True, True, False, True, 1)