# Basic operations (pyspark with dataframes)

First, we are going to import the packages we need.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions

After that, we create the SparkSession.

In [3]:
spark_session = SparkSession \
        .builder \
        .getOrCreate()

Once we have created the SparkSession we work on it to create a dataframe by reading the .csv file.

In [5]:
data_frame = spark_session\
        .read\
        .format("csv")\
        .options(header='true', inferschema='true') \
        .load("data/sample.csv") \
        .persist()

Now let's show the dataframe and its schema.

In [6]:
data_frame.printSchema()
data_frame.show()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Weight: double (nullable = true)
 |-- HasACar: boolean (nullable = true)
 |-- DateBirth: timestamp (nullable = true)

+----+---+------+-------+-------------------+
|Name|Age|Weight|HasACar|          DateBirth|
+----+---+------+-------+-------------------+
|Luis| 15|  58.0|   true|2010-05-03 00:00:00|
|Lola| 25|  60.0|  false|1998-02-12 00:00:00|
|Paco| 32|  83.2|  false|1991-01-12 00:00:00|
+----+---+------+-------+-------------------+



Print the data types of the data frame.

In [7]:
print("data types: " + str(data_frame.dtypes))

data types: [('Name', 'string'), ('Age', 'int'), ('Weight', 'double'), ('HasACar', 'boolean'), ('DateBirth', 'timestamp')]


Describe the dataframe.

In [8]:
data_frame\
        .describe()\
        .show()

+-------+----+----------------+------------------+
|summary|Name|             Age|            Weight|
+-------+----+----------------+------------------+
|  count|   3|               3|                 3|
|   mean|null|            24.0| 67.06666666666666|
| stddev|null|8.54400374531753|14.007616975536322|
|    min|Lola|              15|              58.0|
|    max|Paco|              32|              83.2|
+-------+----+----------------+------------------+



Select everything. We select "name" in this case.

In [9]:
data_frame.select("Name").show()

+----+
|Name|
+----+
|Luis|
|Lola|
|Paco|
+----+



Select columns name and age, but adding 1 to age.

In [11]:
data_frame.select("name", data_frame["Age"] + 1).show()

+----+---------+
|name|(Age + 1)|
+----+---------+
|Luis|       16|
|Lola|       26|
|Paco|       33|
+----+---------+



Select the rows having a name length > 4.

In [12]:
data_frame.select(functions.length(data_frame["Name"]) > 4).show()

+------------------+
|(length(Name) > 4)|
+------------------+
|             false|
|             false|
|             false|
+------------------+



Select names staring with L.

In [13]:
data_frame.select("name", data_frame["name"].startswith("L")).show()

+----+-------------------+
|name|startswith(name, L)|
+----+-------------------+
|Luis|               true|
|Lola|               true|
|Paco|              false|
+----+-------------------+



Add a new column "Senior" containing true if the person age is > 45.

In [14]:
data_frame.withColumn("Senior", data_frame["Age"] > 45).show()

+----+---+------+-------+-------------------+------+
|Name|Age|Weight|HasACar|          DateBirth|Senior|
+----+---+------+-------+-------------------+------+
|Luis| 15|  58.0|   true|2010-05-03 00:00:00| false|
|Lola| 25|  60.0|  false|1998-02-12 00:00:00| false|
|Paco| 32|  83.2|  false|1991-01-12 00:00:00| false|
+----+---+------+-------+-------------------+------+



Rename column HasACar as Owner.

In [15]:
data_frame.withColumnRenamed("HasACar", "Owner").show()

+----+---+------+-----+-------------------+
|Name|Age|Weight|Owner|          DateBirth|
+----+---+------+-----+-------------------+
|Luis| 15|  58.0| true|2010-05-03 00:00:00|
|Lola| 25|  60.0|false|1998-02-12 00:00:00|
|Paco| 32|  83.2|false|1991-01-12 00:00:00|
+----+---+------+-----+-------------------+



Remove column DateBirth.

In [16]:
data_frame.drop("DateBirth").show()

+----+---+------+-------+
|Name|Age|Weight|HasACar|
+----+---+------+-------+
|Luis| 15|  58.0|   true|
|Lola| 25|  60.0|  false|
|Paco| 32|  83.2|  false|
+----+---+------+-------+



Get a RDD.

In [18]:
rdd_from_dataframe = data_frame\
    .rdd\
    .cache()

for i in rdd_from_dataframe.collect():
    print(i)

Row(Name='Luis', Age=15, Weight=58.0, HasACar=True, DateBirth=datetime.datetime(2010, 5, 3, 0, 0))
Row(Name='Lola', Age=25, Weight=60.0, HasACar=False, DateBirth=datetime.datetime(1998, 2, 12, 0, 0))
Row(Name='Paco', Age=32, Weight=83.2, HasACar=False, DateBirth=datetime.datetime(1991, 1, 12, 0, 0))


Sum all the weights (RDD).

In [19]:
sum_of_weights = rdd_from_dataframe\
    .map(lambda row: row[2])\
    .reduce(lambda x, y: x + y) 
print("Sum of weights (RDDs): " + str(sum_of_weights))

Sum of weights (RDDs): 201.2


Sum all the weights (dataframe).

In [21]:
data_frame.select(functions.sum(data_frame["Weight"])).show()
data_frame.agg({"Weight" : "sum"}).show()

v = data_frame.select("Weight").groupBy().sum().collect()
print(v[0][0])

+-----------+
|sum(Weight)|
+-----------+
|      201.2|
+-----------+

+-----------+
|sum(Weight)|
+-----------+
|      201.2|
+-----------+

201.2


Get the mean age (RDD).

In [22]:
mean_age = rdd_from_dataframe\
    .map(lambda row: row[1])\
    .reduce(lambda x, y: x + y) / rdd_from_dataframe.count()

print("Mean age (RDDs): " + str(mean_age))

Mean age (RDDs): 24.0


Get the mean age (dataframe).

In [24]:
data_frame.select(functions.avg(data_frame["Age"])).show()
data_frame.agg({"Age" : "avg"}).show()

+--------+
|avg(Age)|
+--------+
|    24.0|
+--------+

+--------+
|avg(Age)|
+--------+
|    24.0|
+--------+



Write to a json file.

In [25]:
data_frame\
    .write\
    .save("output.json", format="json")

Write to a CSV file.

In [26]:
data_frame\
    .write\
    .format("csv")\
    .save("output.csv")