**Part 2: Spark Dataframe API**


Preliminaries

In [None]:
!apt-get update 
!apt-get install -y openjdk-8-jdk-headless -qq  
!apt-get install maven -qq

!curl -L "https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz" > spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz 
!pip install -q findspark 

!pip install 'apache-airflow==2.2.5'

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "16g").getOrCreate()
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString, StringIndexer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline


In [None]:
from pyspark.sql.functions import col, mean, min, max, col, lit,sum
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, DoubleType, StringType
from airflow import DAG 
from airflow.operators.python import PythonOperator , BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from random import randint 
from os import path

In [None]:
if os.getcwd().split("/")[-1]=="src":
  %cd ..
dir = os.getcwd()
!mkdir src 
!mkdir out 
%cd ./src 

/content/src


**Task 1**

Download the parquet and load to DataFrame

In [None]:
!git init 
!git remote add -f origin https://github.com/databricks/LearningSparkV2 
!git config core.sparseCheckout true 
!echo 'mlflow-project-example/data/*' >> .git/info/sparse-checkout 
!git pull origin master 

Initialized empty Git repository in /content/src/.git/
Updating origin
remote: Enumerating objects: 1720, done.[K
remote: Counting objects: 100% (143/143), done.[K
remote: Compressing objects: 100% (99/99), done.[K
remote: Total 1720 (delta 40), reused 89 (delta 24), pack-reused 1577[K
Receiving objects: 100% (1720/1720), 76.98 MiB | 13.87 MiB/s, done.
Resolving deltas: 100% (527/527), done.
From https://github.com/databricks/LearningSparkV2
 * [new branch]      master     -> origin/master
From https://github.com/databricks/LearningSparkV2
 * branch            master     -> FETCH_HEAD


In [None]:
airbnbDF = spark.read.parquet(dir+"/src/mlflow-project-example/data/sf-airbnb-clean.parquet")
type(airbnbDF)

pyspark.sql.dataframe.DataFrame

**Task 2**

Lists the minimum price, maximum price, and total row count

In [None]:
task2df = airbnbDF.agg(min("price"), max("price")).withColumn("total_row", lit(airbnbDF.count()))
task2df.show()
try:
  task2df.write.csv(dir+"/out/out_2_2.txt")
except:
  print("File already created")

+----------+----------+---------+
|min(price)|max(price)|total_row|
+----------+----------+---------+
|      10.0|   10000.0|     7146|
+----------+----------+---------+



**Task 3**

Calculate the average number of bathrooms and bedrooms

In [None]:
task3df = airbnbDF.where(col("price")>5000).where(  (col("review_scores_value")==10 ) | (col("review_scores_accuracy")==10)| (col("review_scores_rating")==10)| (col("review_scores_cleanliness")==10)| (col("review_scores_checkin")==10)| (col("review_scores_communication")==10)| (col("review_scores_location")==10)).agg(mean("bedrooms"), mean("bathrooms"))
task3df = task3df.withColumnRenamed("avg(bedrooms)", "avg_bedrooms").withColumnRenamed("avg(bathrooms)", "avg_bathrooms")
task3df.show()
try:
  task3df.write.csv(dir+"/out_2_3.txt")
except:
  print("File already created")

+------------+-------------+
|avg_bedrooms|avg_bathrooms|
+------------+-------------+
|         3.0|          2.3|
+------------+-------------+



**Task 4**

People can be accomodated by the property with the lowest price and highest rating


In [None]:
df = airbnbDF.where(col("price")==10000).agg(F.sum("beds")).withColumnRenamed("sum(beds)", "high")
task4df = airbnbDF.where(col("price")==10).agg(F.sum("beds")).withColumn("high",lit(df.collect()[0][0])).withColumnRenamed("sum(beds)", "low")
task4df.show()
try:
  task4df.write.csv(dir+"/out/out_2_4.txt")
except:
  print("File already created")

+---+----+
|low|high|
+---+----+
|2.0| 5.0|
+---+----+



**Task 5**

Create an Airflow Dag

In [None]:
%%writefile task_2_5.py
## Tasks to be implemented (as there wasn´t any implementation especify, it just pass)
def task1():
  pass
def task2():
  pass
def task3():
  pass
def task4():
  pass
def task5():
  pass
def task6():
  pass
# Creation and run of the DAG
with DAG("my_dag", start_date =datetime(2022,1,1),schedule_interval="@daily",catchup=False) as dag:
  task1 = PythonOperator(task_id="task1", python_callable=task1)

  with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
    task2 = PythonOperator( task_id="task2", python_callable=task2 )
    task3 = PythonOperator(task_id="task3",python_callable=task3)
  
  with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
      task4 = PythonOperator(task_id="task4", python_callable=task4)
      task5 = PythonOperator(task_id="task5",python_callable=task5)
      task6 = PythonOperator(task_id="task6", python_callable=task6)
  

  task1 >> section_1 >> section_2 

Writing task_2_5.py
