### Example notebook - example of pattern used: mix of Hive sql, pandas and pySpark

__First we need to open our spark session__ 

In [1]:
%run ./utils_newcluster.py
spark = connect_to_datalake("exampleNotebook")
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

In [2]:
# Functions and datatypes

import pyspark.sql.functions as F
from pyspark.sql.types import *

__First an example of using Hive and create a pandas dataframe. Pattern: Do the heavy data transfomration in Hive SQL and then download to pandas__

In [3]:
# Set database

hive.setDatabase('''appl_transport_index''')

__Execute Hive query and apply toPnadas method to create pandas dataframe__

In [4]:
df_week =  hive.executeQuery('''
                                select 
                                max(c.enddate) as start_date
                                ,c.weeknumber as week_number
                                ,b.countryname as country_name
                                ,b.segment as segment
                                ,count(a.chassisnumber) as number_of_vehicles
                                ,c.yearnumber  as year_number
                                ,avg(totalruntime)/numberofdays    as mean_runtime_per_day
                                ,avg(odometer)/numberofdays as mean_driven_distance_per_day                             
                                from vehicle_time_series_weekly_fact a inner join vehicle_dimension b
                                on a.chassisnumber = b.chassisnumber
                                inner join week_dim c
                                 on a.startdate = c.startdate
                                 where (
                                   (a.startdate between ("2020-01-05") and ("2021-01-05"))  
                                    or (a.startdate between ("2019-01-05") and ("2019-10-07"))
                                    )   
                                and a.age_category IN("age_0_3", "age_3_6", "age_6_9", "age_9__")
                                 group by c.yearnumber, c.weeknumber, b.countryname, b.segment, c.numberofdays
                                 ''').toPandas()

__df_week is now a pandas object and now we can continue with the pandas API to do more sofisticated stuff__

In [5]:
type(df_week)

pandas.core.frame.DataFrame

In [7]:
df_week.head()

Unnamed: 0,start_date,week_number,country_name,segment,number_of_vehicles,year_number,mean_runtime_per_day,mean_driven_distance_per_day
0,2019-06-16,24,Afghanistan,Haulage,4,2019,4.397917,270.131429
1,2020-07-19,29,Australia,Buses other,1581,2020,6.030818,149.595017
2,2019-03-03,9,Australia,Distribution,156,2019,3.76577,168.166813
3,2020-06-21,25,Australia,Distribution,193,2020,3.66888,171.606669
4,2020-12-06,49,Australia,Distribution,210,2020,3.980106,183.790476


__You can also utilize pySpark API. Then yoy create a spark object from the initial query__

In [None]:
stage_dim1 = spark.sql(''' select *
                    ,case
                        when segment_pgr = 'L'             then 'Long Haulage'
                        when segment_pgr = 'C'             then 'Construction'
                        when segment_pgr = 'D'             then 'Distribution'
                        when segment_ntg = 'Long distance' then 'Long Haulage'
                        else segment_ntg
                     end as segment                    
                from stage_dim1
                ''')


__stage_dim1 is a spark object that you can apply pySpark API on__

In [None]:
# Pattern matching to derive two new variables from the column engineinfo
stage_dim2 = stage_dim1.withColumn("enginetype", F.regexp_extract(stage_dim1.engineinfo, r"(\w+\d+\s+\d+)", 0)). \
     withColumn("horsepower", F.regexp_extract(stage_dim1.engineinfo, r"(\d+ hp)", 0)).drop('engineinfo')
stage_dim2.createOrReplaceTempView('stage_dim2')     

__Finally you can store spark tables physically as a Hive table. In the example below we store the spark table df_bus_segment as a physicall Hive table as ca_bus_segment in the hive database project_transport_index__

In [None]:
hive.setDatabase('''project_transport_index''')

df_bus_segment.write.mode('overwrite').format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", 'project_transport_index.ca_bus_segment').save()