<a href="https://colab.research.google.com/github/jcmo-tec21/Tec_IA-Analytics/blob/main/CD3002_601_2013_Notebook_00_PySpark_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark habillitation on Google-Colab with Python

Instalation is quite simple to get on [Google-Colab](https://colab.research.google.com/). That habiitation takes place through Python, using the library [PySpark](https://spark.apache.org/docs/latest/api/python/#:~:text=PySpark%20is%20the%20Python%20API,for%20interactively%20analyzing%20your%20data).

The library can be installed using `pip install...`.


In [13]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [34]:
import pyspark
import pandas as pd
#url = 'https://raw.githubusercontent.com/jcmo-tec21/cd3002.601.2023/main/CD3002.601.2023_Notebook00%20-%20Sheet1.csv?token=GHSAT0AAAAAACCWE65QV7FQQAVHSI7MYEBOZDOMVYA'
#df1 = pd.read_csv(url)
#print(df1)

df2 = pd.read_csv('sample_data/california_housing_train.csv')
print(df2)


       longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
0        -114.31     34.19                15.0       5612.0          1283.0   
1        -114.47     34.40                19.0       7650.0          1901.0   
2        -114.56     33.69                17.0        720.0           174.0   
3        -114.57     33.64                14.0       1501.0           337.0   
4        -114.57     33.57                20.0       1454.0           326.0   
...          ...       ...                 ...          ...             ...   
16995    -124.26     40.58                52.0       2217.0           394.0   
16996    -124.27     40.69                36.0       2349.0           528.0   
16997    -124.30     41.84                17.0       2677.0           531.0   
16998    -124.30     41.80                19.0       2672.0           552.0   
16999    -124.35     40.54                52.0       1820.0           300.0   

       population  households  median_income  media

## Step 1.- Creating the spark session

`SparkSession` is a SQL class in Spark intended to build and handle `data.frames`.

In [21]:
from pyspark.sql import SparkSession

In [22]:
spark = SparkSession.builder.appName('Practica').getOrCreate()
spark

As we can see, Colab installed to `3.4.0` of Spark. However, it is possible to intall other versions (including the current `3.1.1` when working not in the practice environment).

## Step 2.- Reading data

We can read data in multiple formats with Spark. The most common are:

* csv

* jdbc (from databases with JDBC connector)

* json

* parquet

* schema (from databases, as well)

* table

* text



## Data samples

Colab provides some handy data samples:

* `california_housing_data_*.csv` is California housing data from the 1990 US Census; more information is available at: https://developers.google.com/machine-learning/crash-course/california-housing-data-description

* `mnist_*.csv`  is a small sample of the MNIST database, which is described at: http://yann.lecun.com/exdb/mnist/

* `anscombe.json` contains a copy of Anscombe's quartet; it was originally described in Anscombe, F. J. (1973). 'Graphs in Statistical Analysis'. American Statistician. 27 (1): 17-21. JSTOR 2682899.

* [vega dataset library](https://github.com/altair-viz/vega_datasets)


From that, let us illustrate the Spark functionalities with `california_housing_train.csv`

In [31]:
df_spark=spark.read.csv('sample_data/california_housing_train.csv')

The data contains:

* `8` columns 

* 17K rows



In [35]:
df_spark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]

Let us see the top 20 rows in the data frame. 

In [37]:
df_spark.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|    

Let us try to preserve the `header` of the original data frame.

In [39]:
df_spark=spark.read.option('header','true').csv('sample_data/california_housing_train.csv')

In [40]:
df_spark

DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [41]:
df_spark.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

Defining now `df_spark` as data frame.

Note that Spark data frame is not the same as Pandas data fram (or Polars data frame, for the sake of comparisson).

In [42]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

The heading functionality...

In [43]:
df_spark.head(10)

[Row(longitude='-114.310000', latitude='34.190000', housing_median_age='15.000000', total_rooms='5612.000000', total_bedrooms='1283.000000', population='1015.000000', households='472.000000', median_income='1.493600', median_house_value='66900.000000'),
 Row(longitude='-114.470000', latitude='34.400000', housing_median_age='19.000000', total_rooms='7650.000000', total_bedrooms='1901.000000', population='1129.000000', households='463.000000', median_income='1.820000', median_house_value='80100.000000'),
 Row(longitude='-114.560000', latitude='33.690000', housing_median_age='17.000000', total_rooms='720.000000', total_bedrooms='174.000000', population='333.000000', households='117.000000', median_income='1.650900', median_house_value='85700.000000'),
 Row(longitude='-114.570000', latitude='33.640000', housing_median_age='14.000000', total_rooms='1501.000000', total_bedrooms='337.000000', population='515.000000', households='226.000000', median_income='3.191700', median_house_value='73400

Information schema now is possible to be extracted (two ways).

In [46]:
df_spark.printSchema

<bound method DataFrame.printSchema of DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]>

In [48]:
df_spark.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



Now we have to preserve the column attributes of the original data when importing to the data frame.

In [49]:
df_spark = spark.read.csv('sample_data/california_housing_train.csv',header=True, inferSchema=True)

Checking on the output:

In [50]:
df_spark.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [51]:
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



Reviewing the type of the data frame, which is a SQL data frame with a data structure.

In [52]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

And the type of the columns of `df_dpark` are:

In [53]:
df_spark.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

Let us explore the top 10 rows in the data frame.

In [54]:
df_spark.head(10)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0),
 Row(longitude=-114.58, latitud

Alternatively, in this more visual way.

In [55]:
df_spark.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

Let us deal with column selection now. for that, we use the `spark.select()` function.

In [57]:
df_spark.select('longitude')

DataFrame[longitude: double]

This selection is being returned as a **new** data frame.

**What type of instruction is this? Transform, or action?**

In [58]:
df_spark.select('longitude').show()

+---------+
|longitude|
+---------+
|  -114.31|
|  -114.47|
|  -114.56|
|  -114.57|
|  -114.57|
|  -114.58|
|  -114.58|
|  -114.59|
|  -114.59|
|   -114.6|
|   -114.6|
|   -114.6|
|  -114.61|
|  -114.61|
|  -114.63|
|  -114.65|
|  -114.65|
|  -114.65|
|  -114.66|
|  -114.67|
+---------+
only showing top 20 rows



Now, the instruction to select two or more columns from the data frame. We do so in a similar way, listing/naming the coluns we require as separate arguments in the `spark.select()` function. 

In [59]:
df_spark.select('longitude','latitude')

DataFrame[longitude: double, latitude: double]

In [60]:
df_spark.select('longitude','latitude').show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -114.31|   34.19|
|  -114.47|    34.4|
|  -114.56|   33.69|
|  -114.57|   33.64|
|  -114.57|   33.57|
|  -114.58|   33.63|
|  -114.58|   33.61|
|  -114.59|   34.83|
|  -114.59|   33.61|
|   -114.6|   34.83|
|   -114.6|   33.62|
|   -114.6|    33.6|
|  -114.61|   34.84|
|  -114.61|   34.83|
|  -114.63|   32.76|
|  -114.65|   34.89|
|  -114.65|    33.6|
|  -114.65|   32.79|
|  -114.66|   32.74|
|  -114.67|   33.92|
+---------+--------+
only showing top 20 rows



Another functionality to recover the types of the columns in the data frame is through `spark.dtypes()` function.

In [66]:
df_spark.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

And this information is recovered as a `spark.list`, not as a data frame.

In [68]:
type(df_spark.dtypes)

list

Now, let us see how to add columns to existing data frames. Tha is done using the `spark.withColumn()` function.

With this function, only one column adition can be made at a time.

In this example, we are adding the new column `latitude_2`, which is defined as `latitude+100` of the original data frame

In [78]:
df_spark.withColumn('latitude_2',df_spark['latitude']+100)

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, latitude_2: double]

In [79]:
df_spark.withColumn('latitude_2',df_spark['latitude']+100).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|        latitude_2|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|            134.19|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|             134.4|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|            133.69|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|            133.64|
|  -114.57|  

**Note:** The `spark.withColumn()` function helps to "modify" existing columns. That is donw by means of **not renaming the column in the imput of the function.** 



Droping the columsn is intuitively easy using `spark.drop()` function.

In [74]:
df_spark_2=df_spark.withColumn('latitude_2',df_spark['latitude']+100)

In [80]:
df_spark_2.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'latitude_2']

In [81]:
df_spark_2.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|        latitude_2|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|            134.19|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|             134.4|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|            133.69|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|            133.64|
|  -114.57|  

Let us drop the `latitude_2` column.

In [82]:
df_spark_2=df_spark_2.drop('latitude_2')

In [83]:
df_spark_2.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [84]:
df_spark_2.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

renaming columns is quite simple, as well, by means of the function `spark.withColumnRenamed()`.

1. First argument corresponds to "existing" column name.

2. Second argument would be the to-be/new column name.

In [85]:
df_spark_2=df_spark_2.withColumnRenamed('latitude','latitud')

In [86]:
df_spark_2.columns

['longitude',
 'latitud',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [87]:
df_spark_2.show(10)

+---------+-------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitud|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+-------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|  34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|   34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|  33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|  33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|  33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|  33.63|             

We'll continue the study of data engineering functionalities in Spark throughout the notebooks in the [GitHub repo of this module block](https://github.com/jcmo-tec21/cd3002.601.2023).


Thanks!