## Pyspark SQL -- BigQuery (read/write)

In [1]:
# Transform events table into events_dwh
# from_unixtime --> datetime (yyyy-MM-DD HH:mm:ss)
# Run in Cloud Jupyter Notebook

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

# credentials_location = '/root/.google/credentials/google-creds.json'


conf = SparkConf() \
    .setAppName('events') \
    .set("spark.jars", "/usr/lib/spark/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") 
#    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

project_id = "<project name>"
dataset_id = "project1"
table_source = "events"

df = spark.read.format('bigquery') \
    .option("temporaryGcsBucket","dataproc-temp-asia-southeast2-212352110204-1oi7hped") \
    .option("project", project_id) \
    .option("dataset", dataset_id) \
    .load(table_source)
    
df.createOrReplaceTempView("temp_events")

events_transform = spark.sql("""
select from_unixtime((timestamp / 1000), "yyyy-MM-dd HH:mm:ss") as timestamp, 
    visitorid, event, itemid, transactionid
from temp_events
""")

events_transform.show()

project_id = "<project name>"
dataset_id = "project1"
table_target = "events_dwh"
parttition_column = "DATE_FORMAT(timestamp, 'yyyy-MM')"
cluster_column = "event"

events_transform.write \
    .format("bigquery") \
    .option("temporaryGcsBucket","dataproc-temp-asia-southeast2-212352110204-1oi7hped") \
    .option("table", f"{project_id}.{dataset_id}.{table_target}") \
    .option("PARTITION BY",  parttition_column) \
    .option("CLUSTER BY", cluster_column) \
    .mode('Overwrite') \
    .save()

# Stop Spark session
spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/01 01:26:09 INFO SparkEnv: Registering MapOutputTracker
24/04/01 01:26:09 INFO SparkEnv: Registering BlockManagerMaster
24/04/01 01:26:09 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/01 01:26:09 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

+-------------------+---------+-----+------+-------------+
|          timestamp|visitorid|event|itemid|transactionid|
+-------------------+---------+-----+------+-------------+
|2015-06-02 05:02:12|   257597| view|355908|            0|
|2015-06-02 05:50:14|   992329| view|248676|            0|
|2015-06-02 05:13:19|   111016| view|318965|            0|
|2015-06-02 05:12:35|   483717| view|253185|            0|
|2015-06-02 05:02:17|   951259| view|367447|            0|
|2015-06-02 05:48:06|   972639| view| 22556|            0|
|2015-06-02 05:12:03|   810725| view|443030|            0|
|2015-06-02 05:34:51|   794181| view|439202|            0|
|2015-06-02 04:54:59|   824915| view|428805|            0|
|2015-06-02 05:00:04|   339335| view| 82389|            0|
|2015-06-02 05:16:02|   176446| view| 10572|            0|
|2015-06-02 05:08:21|   929206| view|410676|            0|
|2015-06-02 05:50:29|    15795| view| 44872|            0|
|2015-06-02 05:41:37|   598426| view|156489|            

                                                                                

In [None]:
# Transform item_properties table into item_properties_dwh
# from_unixtime --> datetime (yyyy-MM-DD HH:mm:ss)
# Run in Cloud Jupyter Notebook

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

# credentials_location = '/root/.google/credentials/google-creds.json'


conf = SparkConf() \
    .setAppName('item_properties') \
    .set("spark.jars", "/usr/lib/spark/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") 
#    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

spark = SparkSession.builder.config(conf=conf).getOrCreate()

project_id = "<project name>"
dataset_id = "project1"
table_source = "item_properties"

df = spark.read.format('bigquery') \
    .option("temporaryGcsBucket","dataproc-temp-asia-southeast2-212352110204-1oi7hped") \
    .option("project", project_id) \
    .option("dataset", dataset_id) \
    .load(table_source)
    
df.createOrReplaceTempView("temp_item_properties")

item_properties_transform = spark.sql("""
select from_unixtime((timestamp / 1000), "yyyy-MM-dd HH:mm:ss") as timestamp, 
    itemid, property, value
from temp_item_properties
""")

item_properties_transform.show()

project_id = "<project name>"
dataset_id = "project1"
table_target = "item_properties_dwh"
parttition_column = "DATE_FORMAT(timestamp, 'yyyy-MM')"
cluster_column = "property"

item_properties_transform.write \
    .format("bigquery") \
    .option("temporaryGcsBucket","dataproc-temp-asia-southeast2-212352110204-1oi7hped") \
    .option("table", f"{project_id}.{dataset_id}.{table_target}") \
    .option("PARTITION BY",  parttition_column) \
    .option("CLUSTER BY", cluster_column) \
    .mode('Overwrite') \
    .save()

# Stop Spark session
spark.stop()

24/04/01 01:29:59 INFO SparkEnv: Registering MapOutputTracker
24/04/01 01:29:59 INFO SparkEnv: Registering BlockManagerMaster
24/04/01 01:29:59 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/04/01 01:29:59 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

+-------------------+------+----------+--------------------+
|          timestamp|itemid|  property|               value|
+-------------------+------+----------+--------------------+
|2015-06-28 03:00:00|460429|categoryid|                1338|
|2015-09-06 03:00:00|206783|       888|1116713 960601 n2...|
|2015-08-09 03:00:00|395014|       400|n552.000 639502 n...|
|2015-05-10 03:00:00| 59481|       790|          n15360.000|
|2015-05-17 03:00:00|156781|       917|              828513|
|2015-07-05 03:00:00|285026| available|                   0|
|2015-06-14 03:00:00| 89534|       213|             1121373|
|2015-05-17 03:00:00|264312|         6|              319724|
|2015-06-07 03:00:00|229370|       202|             1330310|
|2015-06-14 03:00:00| 98113|       451|     1141052 n48.000|
|2015-08-09 03:00:00|450113|       888|1038400 45956 n50...|
|2015-06-28 03:00:00|244127|       400|n552.000 639502 n...|
|2015-08-16 03:00:00|264319|       227|      1283144 353870|
|2015-08-16 03:00:00|348

[Stage 1:>                                                          (0 + 2) / 2]