In [1]:
from pyspark.sql import SparkSession
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'secrets/serviceKey.json'


# JAR paths for BigQuery and GCS connectors
bigquery_connector_jar = "spark-bigquery-connector.jar"
gcs_connector_jar = "gcs-connector.jar"


# Create SparkSession with both connectors
spark = SparkSession.builder \
    .appName("PySpark with BigQuery and GCS") \
    .config("spark.jars", f"{bigquery_connector_jar},{gcs_connector_jar}") \
    .config("spark.sql.catalog.spark_bigquery", "com.google.cloud.spark.bigquery.BigQueryCatalog") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "secrets/serviceKey.json") \
    .config("spark.bigquery.projectId", "idmpproject-441123") \
    .getOrCreate()

spark

24/11/20 15:32:45 WARN Utils: Your hostname, sabaris-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.110.50.201 instead (on interface en0)
24/11/20 15:32:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/11/20 15:32:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/20 15:32:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Specify the BigQuery table
project_id = "idmpproject-441123"
dataset_id = "uberFareEstimation"
table_name = "uber_data"

bigquery_table = f"{project_id}.{dataset_id}.{table_name}"

# Read data from BigQuery into a Spark DataFrame
df = spark.read \
    .format("bigquery") \
    .option("table", bigquery_table) \
    .load()

# Show the first few rows of the DataFrame
df.show()


                                                                                

+--------+--------+----------------+------------------+-------------+-----+----------------+------+--------+
|distance|cab_type|      time_stamp|       destination|       source|price|surge_multiplier|    id|    name|
+--------+--------+----------------+------------------+-------------+-----+----------------+------+--------+
|    0.94|    Uber|2018-11-28T23:00|         North End|North Station|  4.5|             1.0| 39765|UberPool|
|    0.94|    Uber|2018-12-14T19:00|         North End|North Station|  4.5|             1.0|437984|UberPool|
|    0.63|    Uber|2018-11-27T21:00|Financial District|South Station|  4.5|             1.0|  1644|UberPool|
|    0.63|    Uber|2018-12-15T15:00|Financial District|South Station|  4.5|             1.0| 10780|UberPool|
|    0.63|    Uber|2018-12-15T13:00|Financial District|South Station|  4.5|             1.0| 21598|UberPool|
|    0.63|    Uber|2018-11-28T20:00|Financial District|South Station|  4.5|             1.0| 25567|UberPool|
|    0.63|    Uber|

In [3]:
df.printSchema()

root
 |-- distance: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- source: string (nullable = true)
 |-- price: double (nullable = true)
 |-- surge_multiplier: double (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
# Get number of rows
num_rows = df.count()

# Get number of columns
num_columns = len(df.columns)

# Print the shape
print(f"Number of rows: {num_rows}, Number of columns: {num_columns}")


Number of rows: 330568, Number of columns: 9


In [6]:
df_filtered1 = df.filter((df["destination"] == "North End") & (df["price"] > 50))

In [7]:
df_filtered1.show()

+--------+--------+----------------+-----------+------------------+-----+----------------+------+---------+
|distance|cab_type|      time_stamp|destination|            source|price|surge_multiplier|    id|     name|
+--------+--------+----------------+-----------+------------------+-----+----------------+------+---------+
|    1.57|    Uber|2018-11-28T11:00|  North End|  Theatre District| 52.0|             1.0|419824|Black SUV|
|    2.43|    Uber|2018-12-01T17:00|  North End|       Beacon Hill| 52.0|             1.0|282138|Black SUV|
|    1.08|    Uber|2018-12-14T04:00|  North End|     North Station| 56.0|             1.0| 79077|Black SUV|
|    7.34|    Uber|2018-12-01T00:00|  North End|          Back Bay| 58.0|             1.0|630617|Black SUV|
|     2.7|    Uber|2018-11-28T12:00|  North End|          Back Bay| 58.0|             1.0|532066|Black SUV|
|     3.4|    Uber|2018-11-27T00:00|  North End|          Back Bay| 64.0|             1.0|511471|Black SUV|
|    7.34|    Uber|2018-12-0