## BigQuery To BigQuery

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, StringType

# Spark oturumu oluşturma
spark = SparkSession.builder.appName("TaxiTrip").config("spark.packages.jars", "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar").getOrCreate()
spark.conf.set("temporaryGcsBucket", "deneme12121")

df = spark.read.format("bigquery") \
    .option("table", "scenic-parity-429506-b8.deneme12121.yellow_tripdata") \
    .load()

24/08/14 17:43:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:

# Sütun adlarını yeniden adlandırma ve veri türlerini dönüştürme
df_processed = df.withColumnRenamed("VendorID", "vendor_id") \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [3]:

# 1. Veri Temizleme
# Eksik verileri filtreleme (örneğin, fare_amount ve trip_distance için)
# fare_amount ve trip_distance sütunlarında eksik değerleri filtreleme yani bana null olmayanları getir
df_clean = df_processed.filter(F.col("fare_amount").isNotNull() & F.col("trip_distance").isNotNull())


# Uç değerleri filtreleme (örneğin, fare_amount ve trip_distance için belirli bir aralık)
df_filtered = df_clean.filter((F.col("fare_amount") > 0) & (F.col("fare_amount") < 1000) &
                              (F.col("trip_distance") > 0) & (F.col("trip_distance") < 100))


In [4]:
# 2. Özet İstatistikler
df_filtered.describe().show()

                                                                                

+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+----------------+------------------+---------------------+------------------+--------------------+-------------------+------------------+
|summary|         vendor_id|   passenger_count|     trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|     DOLocationID|      payment_type|      fare_amount|             extra|            mta_tax|      tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee| __index_level_0__|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+----------------+------------------+---------------------+---------------

In [5]:

# 3. Zaman Serisi Analizi
# Günlük ortalama toplam tutar
daily_summary = df_filtered.groupBy(F.to_date("pickup_datetime").alias("date")) \
    .agg(F.avg("total_amount").alias("avg_total_amount"))

# En yoğun günler (top 10 gün)
top_days = daily_summary.orderBy(F.col("avg_total_amount").desc()).limit(10)
top_days.show()


[Stage 3:>                                                          (0 + 1) / 1]

+----------+------------------+
|      date|  avg_total_amount|
+----------+------------------+
|2002-12-31| 70.13499999999999|
|2024-06-01|            37.065|
|2024-04-30|            35.626|
|2009-01-01|            33.765|
|2024-05-06| 30.84228233507784|
|2024-05-13|30.515392638150246|
|2024-05-28|30.369016093491435|
|2024-05-20|29.994445413661776|
|2024-05-15| 29.99284061106243|
|2024-05-16|29.824663989674004|
+----------+------------------+



                                                                                

In [7]:
# İşlenmiş veriyi BigQuery'ye yazma
# temporaryGcsBucket -> geçici dosyaların saklanacağı Cloud Storage kovasını belirtir
# createDisposition -> tablo yoksa oluşturulacak (CREATE_IF_NEEDED, CREATE_NEVER, CREATE_EMPTY, CREATE_IF_EMPTY)
# writeDisposition -> tablo varsa verilerin üzerine yazılacak
# table -> verilerin yazılacağı tablo adı
df_filtered.write.format("bigquery") \
    .option("temporaryGcsBucket",  "deneme12121") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .option("table", "deneme12121.yellow_tripdata_2") \
    .save()
print("Veri başarıyla BigQuery'e yazıldı.")

                                                                                

Veri başarıyla BigQuery'e yazıldı.


## Google Cloud Storage to BigQuery

In [6]:
# GCS gs://dataiku1/fhv_tripdata_2024-05.parquet dosyasını oku ve df'ye ata ardından df'yi bigquerydeki stream.employees tablosuna yaz
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg


# Spark oturumu oluştur
spark = SparkSession.builder.appName("TaxiTrip").getOrCreate()

# Google Cloud Storage Bucket'ını belirle
bucket = "deneme12121"
spark.conf.set("temporaryGcsBucket", bucket)

# Parquet dosyasını oku
df = spark.read.parquet("gs://deneme12121/yellow_tripdata_2024-05.parquet")
df.printSchema()


# Analitik işlem: Her bir passenger_count için toplam trip_distance ve ortalama total_amount hesapla
analytics_df = df.groupBy("passenger_count") \
    .agg(
        sum("trip_distance").alias("total_trip_distance"),
        avg("total_amount").alias("avg_total_amount")
    )

# Analitik sonucu göster
analytics_df.show()


analytics_df.write.format("bigquery") \
    .option("temporaryGcsBucket",  "deneme12121") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .option("table", "deneme12121.yellow_tripdata_3") \
    .save()
print("Veri başarıyla BigQuery'e yazıldı.")


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- __index_level_0__: long (nullable = true)



                                                                                

+---------------+-------------------+------------------+
|passenger_count|total_trip_distance|  avg_total_amount|
+---------------+-------------------+------------------+
|            8.0|  48.92999999999999|           143.214|
|            0.0|  51464.39000000012|25.546128640776864|
|            7.0|                4.0| 73.63333333333333|
|           null|  3894544.979999995| 25.70265168261624|
|            1.0|  4226768.250000223|27.873683610054012|
|            4.0| 121472.67999999993| 32.31536421307662|
|            3.0| 203744.35999999917|30.729450231695576|
|            2.0|  984922.3199999966|31.675241127995747|
|            6.0| 31032.730000000054|27.722026431718117|
|            5.0| 50388.300000000134|28.207172359015164|
|            9.0|              41.18|            107.22|
+---------------+-------------------+------------------+



                                                                                

Veri başarıyla BigQuery'e yazıldı.
