In [None]:
import os

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from google.cloud import bigquery

from config import EXCHANGE_TOPIC, CREDENTIALS_FILE, GCS_BUCKET, PROJECT_ID

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = CREDENTIALS_FILE

pyspark_version = pyspark.__version__

In [None]:
dataset_id = "assets_v2" 
table_id = "exchange_all"


In [None]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('CryptoCurrencyConsumer') \
    .set("spark.jars", "/opt/homebrew/lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", CREDENTIALS_FILE) \
    .set("spark.jars.packages", f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version},com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.15.1-beta")

In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", CREDENTIALS_FILE)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [None]:
spark = SparkSession \
    .builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [None]:
exchange = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "exchange-topic") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
from pyspark.sql import types

exchange_spark_schema = types.StructType() \
    .add("exchangeId", types.StringType()) \
    .add("name", types.StringType()) \
    .add("rank", types.StringType()) \
    .add("percentTotalVolume", types.StringType()) \
    .add("volumeUsd", types.StringType()) \
    .add("tradingPairs", types.StringType()) \
    .add("socket", types.StringType())\
    .add("exchangeUrl", types.StringType())\
    .add("updated", types.StringType())\
    .add("timestamp", types.StringType())

In [None]:
from pyspark.sql import functions as F

exchange = exchange \
  .select(F.from_json(F.col("value").cast('STRING'),exchange_spark_schema)).alias("exchange") \
  .select("exchange.from_json(CAST(value AS STRING)).*")

exchange_ts = exchange.withColumn('processing_timestamp', F.current_timestamp())

In [None]:
exchange_ts.show(truncate=False)

In [None]:
exchange_ts.write \
  .format("bigquery") \
    .option("table", f"{PROJECT_ID}.{dataset_id}.{table_id}") \
    .option("partitionField", "processing_timestamp")\
    .option("clusteredFields", "exchangeId")\
    .option("temporaryGcsBucket", GCS_BUCKET) \
    .mode("append") \
    .save()

In [None]:
spark.stop()