In [1]:
%%bash

echo Hello, try to find the environments before running
echo $(pwd)
echo $VIRTUAL_ENV
echo $SPARK_HOME

Hello, try to find the environments before running
/Users/kiang/Desktop/PythonProjects/Repos/ApacheSparkLearning/LearningSparkV2/chapter3/py/src
/Users/kiang/Desktop/PythonProjects/Repos/ApacheSparkLearning/sparklearning_env



# The *DataFrame* API

In [2]:
import os
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pathlib import Path 

StructTypes are a Seq of Struct Fields. 2 Names with the same name are not allowed.
https://spark.apache.org/docs/latest/sql-ref-datatypes.html

ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType. containsNull is used to indicate if elements in a ArrayType value can have null values.

MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys is described by keyType and the data type of values is described by valueType. For a MapType value, keys are not allowed to have null values. valueContainsNull is used to indicate if values of a MapType value can have null values.

In [3]:
spark = (SparkSession
       .builder
       .appName("Aggregations and Computations in Spark Exercise - Chapter 3")
       .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/16 20:19:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sf_fire_datasets = "/Users/kiang/Desktop/PythonProjects/Repos/ApacheSparkLearning/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

In [5]:
#StructType Parameter with a List of StructField() methods per column.
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
              StructField('UnitID', StringType(), True),
              StructField('IncidentNumber', IntegerType(), True),
              StructField('CallType', StringType(), True),
              StructField('CallDate', StringType(), True),
              StructField('WatchDate', StringType(), True),
              StructField('CallFinalDisposition', StringType(), True),
              StructField('AvailableDtTm', StringType(), True),
              StructField('Address', StringType(), True),
              StructField('City', StringType(), True),
              StructField('Zipcode', IntegerType(), True),
              StructField('Battalion', StringType(), True),
              StructField('StationArea', StringType(), True),
              StructField('Box', StringType(), True),
              StructField('OriginalPriority', StringType(), True),
              StructField('Priority', StringType(), True),
              StructField('FinalPriority', IntegerType(), True),
              StructField('ALSUnit', BooleanType(), True),
              StructField('CallTypeGroup', StringType(), True),
              StructField('NumAlarms', IntegerType(), True),
              StructField('UnitType', StringType(), True),
              StructField('UnitSequenceInCallDispatch', IntegerType(), True),
              StructField('FirePreventionDistrict', StringType(), True),
              StructField('SupervisorDistrict', StringType(), True),
              StructField('Neighborhood', StringType(), True),
              StructField('Location', StringType(), True),
              StructField('RowID', StringType(), True),
              StructField('Delay', FloatType(), True)])

# Using the DataFrameReader Interface to read the CSV file into Spark.
fire_df = spark.read.csv(sf_fire_datasets, header = True, schema = fire_schema)
            

### Accessing Columns from spark dataframes

> To access columns, use ```.columns```
> You can access them individual columnnames by 
> ```.columns[index]```

> Columns are stored as a list, so you can iterate through it.
```for col in df.columns```

In [6]:
fire_df.columns
fire_df.columns[0]
for col in fire_df.columns:
    print(col)

CallNumber
UnitID
IncidentNumber
CallType
CallDate
WatchDate
CallFinalDisposition
AvailableDtTm
Address
City
Zipcode
Battalion
StationArea
Box
OriginalPriority
Priority
FinalPriority
ALSUnit
CallTypeGroup
NumAlarms
UnitType
UnitSequenceInCallDispatch
FirePreventionDistrict
SupervisorDistrict
Neighborhood
Location
RowID
Delay


In [7]:
fire_df.show()

24/06/16 20:19:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

In [8]:
fire_df.printSchema()
# fire_df.createOrReplaceTempView("temp_tbl")

# spark.sql("""
        #   select * from temp_tbl limit 100
        #   """)

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [9]:
#?SELECT
fire_df.select("CallNumber", "UnitID", "IncidentNumber").show(5, truncate = False )

#?GROUP BY, ORDER BY, SELECT, WHERE: Count the number of rows that are in each unique call type.
(fire_df.select("CallType")
            .where(col("CallType").isNotNull()) #*Is this step needed, does spark innately possess the ability to remove rows with null values?
            .groupBy("CallType")
            .count()
            .orderBy('count', ascending = False)
            .show(truncate = False)
        )

(fire_df.count())

+----------+------+--------------+
|CallNumber|UnitID|IncidentNumber|
+----------+------+--------------+
|20110016  |T13   |2003235       |
|20110022  |M17   |2003241       |
|20110023  |M41   |2003242       |
|20110032  |E11   |2003250       |
|20110043  |B04   |2003259       |
+----------+------+--------------+
only showing top 5 rows



TypeError: 'str' object is not callable

24/06/16 20:19:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Schemas in Spark Dataframes

### There are some benefits of declaring a schema early before reading in datasource. (Schema on Read)

> * You relieve Spark from the onus of inferring data types.
> 
> * You prevent spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time consuming.
> 
> * You can detect errors early if data doesn't match the schema.


### Two ways to define a Schema
> Spark allows you to define a schema in two ways. 
> * One is to define it programmatially, 
> 
> * The other is to employ a Data Definition Language (DDL) string, which is much simpler and easier to read.


To define a schema programmatically for a DataFrame with three named columns, author, title, and pages, you can use the Spark DataFrame API. 

For example:

In Scala
```
import org.apache.spark.sql.types._
val schema = StructType(Array(
                        StructField("author", StringType, false),
                        StructField("title", StringType, false), 
                        StructField("pages", IntegerType, false)
                        ))
```
In Python

```
from pyspark.sql.types import *
schema = StructType([StructField("author", StringType(), False),
      StructField("title", StringType(), False),
      StructField("pages", IntegerType(), False)])
```

Defining the same schema using DDL is much simpler:

In Scala:
```
val schema = "author STRING, title STRING, pages INT"
```

Python
```
schema = "author STRING, title STRING, pages INT"
```


#import org.apache.spark.sql.types#

Read up on this to get more information on how to use spark types.

In [None]:
(fire_df.select(
                 "CallNumber"
                ,"UnitID"
                ,"IncidentNumber"
                ,"CallType"
                ,"CallDate"
                ,"WatchDate"
                ,"CallFinalDisposition"
                ,"AvailableDtTm"
                ,"Address"
                ,"City"
                ,"Zipcode"
                ,"Battalion"
                ,"StationArea"
                ,"Box"
                ,"OriginalPriority"
                ,"Priority"
                ,"FinalPriority"
                ,"ALSUnit"
                ,"CallTypeGroup"
                ,"NumAlarms"
                ,"UnitType"
                ,"UnitSequenceInCallDispatch"
                ,"FirePreventionDistrict"
                ,"SupervisorDistrict"
                ,"Neighborhood"
                ,"Location"
                ,"RowID"
                ,"Delay"
                )
                .show(10)
                )

24/06/16 17:41:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [None]:
# Use .schema to retrieve the DDL from the declared df, if you want to reuse it.
print(fire_df.schema)

# Retrieve the inside of the field
print(fire_df.schema['UnitID'])

#Use this 
print(fire_df.printSchema())

StructType([StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True), StructField('IncidentNumber', IntegerType(), True), StructField('CallType', StringType(), True), StructField('CallDate', StringType(), True), StructField('WatchDate', StringType(), True), StructField('CallFinalDisposition', StringType(), True), StructField('AvailableDtTm', StringType(), True), StructField('Address', StringType(), True), StructField('City', StringType(), True), StructField('Zipcode', IntegerType(), True), StructField('Battalion', StringType(), True), StructField('StationArea', StringType(), True), StructField('Box', StringType(), True), StructField('OriginalPriority', StringType(), True), StructField('Priority', StringType(), True), StructField('FinalPriority', IntegerType(), True), StructField('ALSUnit', BooleanType(), True), StructField('CallTypeGroup', StringType(), True), StructField('NumAlarms', IntegerType(), True), StructField('UnitType', StringType(), True), St

## Reading from JSON Files 

You can read the data from the json files too.
Try to read in the blogs.json file in the ./databricks-datasets

Note : blogs.json used here is not a typical JSON file. You'll likely run into problems when running with actual json files.
https://spark.apache.org/docs/latest/sql-data-sources-json.html

In [None]:
blogs_path  = "/Users/kiang/Desktop/PythonProjects/Repos/ApacheSparkLearning/LearningSparkV2/databricks-datasets/learning-spark-v2/blogs.json"

# Without defining, its schema on read.
blogs_df = spark.read.json(blogs_path)

blogs_df.show()

+--------------------+---------+-----+---+-------+---------+-----------------+
|           Campaigns|    First| Hits| Id|   Last|Published|              Url|
+--------------------+---------+-----+---+-------+---------+-----------------+
| [twitter, LinkedIn]|    Jules| 4535|  1|  Damji| 1/4/2016|https://tinyurl.1|
| [twitter, LinkedIn]|   Brooke| 8908|  2|  Wenig| 5/5/2018|https://tinyurl.2|
|[web, twitter, FB...|    Denny| 7659|  3|    Lee| 6/7/2019|https://tinyurl.3|
|       [twitter, FB]|Tathagata|10568|  4|    Das|5/12/2018|https://tinyurl.4|
|[web, twitter, FB...|    Matei|40578|  5|Zaharia|5/14/2014|https://tinyurl.5|
| [twitter, LinkedIn]|  Reynold|25568|  6|    Xin| 3/2/2015|https://tinyurl.6|
+--------------------+---------+-----+---+-------+---------+-----------------+



## Columns and Expressions (Page. 54)

You can also use logical or mathematical expressions on columns.

You could create using
```expr("columnName  * 5")``` or 
```(expr("columnName1 - 5") > col("columnName"))```
where colname is a spark type.


```expr()``` is a part of the pyspark.sql.functions (python) and org.apache.spark.sql.functions (Scala) packages

Note: Where do you use the expr() function? 
* You can use it inside the ```.select()``` statement i.e. ```df.select(expr())```. It will compute values base on the expression.
* Or you could use it inside the ```.where()``` i.e ```df.select().where(expr())``` statement.
* You can also use it within the ```.withColumn("Name" , expr("ColumnName * 2"))```

In [None]:
# Adds a boolean row.
(blogs_df.select('*'
                 ,expr("First == 'Jules'")
                 ,expr("Hits * 2")
                 ).show())

+--------------------+---------+-----+---+-------+---------+-----------------+---------------+----------+
|           Campaigns|    First| Hits| Id|   Last|Published|              Url|(First = Jules)|(Hits * 2)|
+--------------------+---------+-----+---+-------+---------+-----------------+---------------+----------+
| [twitter, LinkedIn]|    Jules| 4535|  1|  Damji| 1/4/2016|https://tinyurl.1|           true|      9070|
| [twitter, LinkedIn]|   Brooke| 8908|  2|  Wenig| 5/5/2018|https://tinyurl.2|          false|     17816|
|[web, twitter, FB...|    Denny| 7659|  3|    Lee| 6/7/2019|https://tinyurl.3|          false|     15318|
|       [twitter, FB]|Tathagata|10568|  4|    Das|5/12/2018|https://tinyurl.4|          false|     21136|
|[web, twitter, FB...|    Matei|40578|  5|Zaharia|5/14/2014|https://tinyurl.5|          false|     81156|
| [twitter, LinkedIn]|  Reynold|25568|  6|    Xin| 3/2/2015|https://tinyurl.6|          false|     51136|
+--------------------+---------+-----+---+----

In [None]:
# Use expression to filter a value.
blogs_df.select('*').where(expr("First == 'Jules'")).show()

In [None]:
print(blogs_df.printSchema())

# Notice Publish is a string. Because this table is schema on Read, spark inferred Published wrong, thus, order by will not work
# correctly on strings.
(blogs_df
 .withColumn('FullName', concat(expr("First") , expr("Last"))) # Used in withColumn
 .select('*')
 .orderBy("Hits")
 ).show()

### Important Note:

> Column Objects in a Dataframe cannot exist in isolation (i.e. $"FullName") each column is a part of a row in a record and all the rows together constitute a Dataframe, which was we will learn later in the chapter,
> is really a Dataset[Row] in Scala.
>
> Next Pg 57. on Rows*** Code in Scala.

Will continue on to Dataframe Operations pg 58.

# Common Dataframe Operations 

Reading in a Large CSV file from `sf-fire-calls.csv`
You can use DataFrameReader and DataFrameWriter and its *methods* to read and write a bunch of formats.
* Avro
* JSON
* CSV
* Parquet - A columnar format that uses snappy compression to compress the data.
If the dataframe is written as parquet, it preserves the format as part of the Parquet metadata.
* Text
* ORC


# Reading in dataframes can go wrong with defined schemas! 

 Here i try defining the schema first. The location field looks likes this *(13.3123543245, 13.5456755)*

https://stackoverflow.com/questions/71969652/read-csv-that-contains-array-of-string-in-pyspark

 *CallDate* and *WatchDate* fields both are YYYY/MM/DD formats in the CSV file, but they are not read in properly.



 *CallType* fields is read in as NULL, means something went wrong.
 
 Quickly realise 2 things. first, i cannot define the schema with complex datatypes when reading in a CSV file. You will have to read in complex types as Strings first.
 

Writing using DataFrameWriter is as good as `spark.write.format('parquet').save(file_path_to_save_to)`

Alternatively, you can save it as a **Table** when you are writing using DataFrameWriters, this registers the metadata in 
the *Hive metadata store*.
`spark.write.format('parquet').saveAsTable(file_path_to_save_to)`

In [10]:
test_schema = StructType([StructField('CallNumber',IntegerType(),True)
                         ,StructField("UnitID",StringType(),True)
                         ,StructField("IncidentNumber",StringType(),True)
                         ,StructField("CallType",StringType(),True)
                         ,StructField("CallDate",DateType(),True)
                         ,StructField("WatchDate",DateType(),True)
                         ,StructField("CallFinalDisposition",StringType(),True)
                         ,StructField("AvailableDtTm",DateType(),True)
                         ,StructField("Address",StringType(),True)
                         ,StructField("City",StringType(),True)
                         ,StructField("Zipcode",StringType(),True)
                         ,StructField("Battalion",StringType(),True)
                         ,StructField("StationArea",StringType(),True)
                         ,StructField("Box",StringType(),True)
                         ,StructField("OriginalPriority",IntegerType(),True)
                         ,StructField("Priority",IntegerType(),True)
                         ,StructField("FinalPriority",IntegerType(),True)
                         ,StructField("ALSUnit", StringType(),True)
                         ,StructField("CallTypeGroup",StringType(),True)
                         ,StructField("NumAlarms",IntegerType(),True)
                         ,StructField("UnitType",StringType(),True)
                         ,StructField("UnitSequenceInCallDispatch",IntegerType(),True)
                         ,StructField("FirePreventionDistrict",IntegerType(),True)
                         ,StructField("SupervisorDistrict",IntegerType(),True)
                         ,StructField("Neighborhood",StringType() ,True)
                        #  ,StructField("Location",ArrayType(DecimalType(), True),True) # Location is a tuple of 
                         ,StructField("Location", StringType() ,True) # Use this will allow data to read successfully.
                         ,StructField("RowID",StringType(),True)
                         ,StructField("Delay",DecimalType(),True)])

In [11]:
calls_path = "/Users/kiang/Desktop/PythonProjects/Repos/ApacheSparkLearning/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

# Reads in by default all strings
!# You cannot read in the csv data with a Schema of Complex Types! Read them in as strings first.
calls_df = spark.read.format('csv').load(calls_path, header = True, schema = test_schema)

In [12]:
calls_df.show(3, truncate = False)

+----------+------+--------------+----------------+--------+---------+--------------------+-------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+---------------------+-------------------------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|CallType        |CallDate|WatchDate|CallFinalDisposition|AvailableDtTm|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood         |Location                             |RowID        |Delay|
+----------+------+--------------+----------------+--------+---------+--------------------+-------------+---------------------------+----+-------+---------+-----------+----+----------------+--

Hang on.. these fields **CallDate** **WatchDate** are not reading in correctly...

Looks like all of them are NULLs.

In [None]:
calls_df.select("CallNumber" 
         ,"UnitID"
         ,"CallType"
         ,"CallDate"
         ,"WatchDate"
         ,"CallTypeGroup").where(col("CallDate").isNotNull()).show()


AttributeError: 'list' object has no attribute 'select'

Use the `f.to_timestamp(<Variable> , 'format')` to change dates into their appropriate format from a String. 

Note the 2nd Parameter here `format` is the format that the string is in, not the format u want to write it into!

I.e. `withColumn(Name, to_timestamp(Name, 'yyyy/mm/dd')`

Lets try it again, by reading in the dataset again. This time with all String Types

In [None]:
test_schema = StructType([StructField('CallNumber',IntegerType(),True)
                         ,StructField("UnitID",StringType(),True)
                         ,StructField("IncidentNumber",StringType(),True)
                         ,StructField("CallType",StringType(),True)
                         ,StructField("CallDate",StringType(),True)
                         ,StructField("WatchDate",StringType(),True)
                         ,StructField("CallFinalDisposition",StringType(),True)
                         ,StructField("AvailableDtTm",StringType(),True)
                         ,StructField("Address",StringType(),True)
                         ,StructField("City",StringType(),True)
                         ,StructField("Zipcode",StringType(),True)
                         ,StructField("Battalion",StringType(),True)
                         ,StructField("StationArea",StringType(),True)
                         ,StructField("Box",StringType(),True)
                         ,StructField("OriginalPriority",IntegerType(),True)
                         ,StructField("Priority",IntegerType(),True)
                         ,StructField("FinalPriority",IntegerType(),True)
                         ,StructField("ALSUnit", StringType(),True)
                         ,StructField("CallTypeGroup",StringType(),True)
                         ,StructField("NumAlarms",IntegerType(),True)
                         ,StructField("UnitType",StringType(),True)
                         ,StructField("UnitSequenceInCallDispatch",IntegerType(),True)
                         ,StructField("FirePreventionDistrict",IntegerType(),True)
                         ,StructField("SupervisorDistrict",IntegerType(),True)
                         ,StructField("Neighborhood",StringType() ,True)
                        #  ,StructField("Location",ArrayType(DecimalType(), True),True) # Location is a tuple of 
                         ,StructField("Location", StringType() ,True) # Location is a tuple of 
                         ,StructField("RowID",StringType(),True)
                         ,StructField("Delay",DecimalType(),True)])
calls_df = spark.read.format('csv').load(calls_path, header = True, schema = test_schema)

In [None]:
calls_df = (calls_df.select("*")
 .withColumn("CallDateAsDt", to_date(col("CallDate"), 'dd/mm/yyyy'))
 .withColumn("CallDate", to_timestamp(col("CallDate"), "dd/mm/yyyy"))
 .withColumn("WatchDate", to_timestamp(col("WatchDate"), "dd/mm/yyyy"))
# .distinct()
)

calls_df.take(5)

[Row(CallNumber=20110016, UnitID='T13', IncidentNumber='2003235', CallType='Structure Fire', CallDate=datetime.datetime(2002, 1, 1, 0, 11), WatchDate=datetime.datetime(2002, 1, 1, 0, 10), CallFinalDisposition='Other', AvailableDtTm='01/11/2002 01:51:44 AM', Address='2000 Block of CALIFORNIA ST', City='SF', Zipcode='94109', Battalion='B04', StationArea='38', Box='3362', OriginalPriority=3, Priority=3, FinalPriority=3, ALSUnit='false', CallTypeGroup=None, NumAlarms=1, UnitType='TRUCK', UnitSequenceInCallDispatch=2, FirePreventionDistrict=4, SupervisorDistrict=5, Neighborhood='Pacific Heights', Location='(37.7895840679362, -122.428071912459)', RowID='020110016-T13', Delay=Decimal('3'), CallDateAsDt=datetime.date(2002, 1, 1)),
 Row(CallNumber=20110022, UnitID='M17', IncidentNumber='2003241', CallType='Medical Incident', CallDate=datetime.datetime(2002, 1, 1, 0, 11), WatchDate=datetime.datetime(2002, 1, 1, 0, 10), CallFinalDisposition='Other', AvailableDtTm='01/11/2002 03:01:18 AM', Address

# Transformations and Actions 
* Renaming, adding and dropping columns 
* Aggregations
* The Dataset API. (Scala and Java)

Now you can use pyspark.sql.functions `.month()`, `year()`, and `day()` to analyse the data further.

`.avg()` ,`.min()`, `.max()`, `.sum()`, `.distinct()` for basic Aggregations

In [None]:
(calls_df.select(year("CallDate"))
.distinct()
.orderBy(year("CallDate"))
.show(5))



+--------------+
|year(CallDate)|
+--------------+
|          2000|
|          2001|
|          2002|
|          2003|
|          2004|
+--------------+
only showing top 5 rows



                                                                                

The dataframe API offers the `collect()` method. For extremely large operations this is highly computationally intensive, and lead to OOM exceptions.

Unlike `count()` which returns a single number to the driver.

`collect()`` returns a collection of all the `Row`` objects in the entire Dataframe object to the driver.

Use `take(n)`, which returns the n number of `row objects` back to the driver.
