In [1]:
import os
import pyspark.sql.functions as F

from pyspark.sql import SparkSession



# Caminho para o arquivo de credenciais
credentials_path = "/home/jovyan/work/desafio-stone-439013-cd4f41db04c5.json"

# Configurar a variável de ambiente para as credenciais do Google Cloud
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

# Crie a sessão do Spark com o conector BigQuery
spark = (
    SparkSession.builder
    .appName('BigQuery Crypto Ethereum')
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.30.0,org.postgresql:postgresql:42.2.23') \
    .config("spark.sql.execution.arrow.enabled", "true")
    .getOrCreate()
)
spark

In [2]:
# Lendo dados do BigQuery
project_id = "desafio-stone-439013"
bigquery_table = "bigquery-public-data.crypto_ethereum.tokens"
df = (
    spark.read
    .format("bigquery") 
    .option("project", project_id) 
    .option("table", bigquery_table)
    .option("filter", "DATE(block_timestamp) = '2024-10-16'") \
    .load()
)

# Exibir os dados
df.show()

+--------------------+--------+-------------------+--------+--------------------+-------------------+------------+--------------------+
|             address|  symbol|               name|decimals|        total_supply|    block_timestamp|block_number|          block_hash|
+--------------------+--------+-------------------+--------+--------------------+-------------------+------------+--------------------+
|0x06f129ba7797790...| ROBODOG|A Dog Made of Metal|       2|10000000000000000...|2024-10-16 23:43:59|    20981480|0x658f5dee851f142...|
|0xacb86779718b20b...| ROBODOG|A Dog Made of Metal|       2|10000000000000000...|2024-10-16 23:33:59|    20981430|0x80a02a1dab761c5...|
|0xf3bdecf96f810a7...|ROBOShib|A Dog Made of Steel|       2|          1000000000|2024-10-16 23:47:59|    20981500|0xa44b0553ddb28cb...|
|0x6289ee8ad48cadd...|        |                   |       8|                   0|2024-10-16 04:31:59|    20975753|0x2e16805ce0c3480...|
+--------------------+--------+-----------------

In [6]:
df = (
    df.withColumn("address", F.col("address").astype("string"))
    .withColumn("symbol", F.col("symbol").astype("string"))
    .withColumn("name", F.col("name").astype("string"))
    .withColumn("decimals", F.col("decimals").astype("string"))
    .withColumn("total_supply", F.col("total_supply").astype("string"))
    .withColumn("block_timestamp", F.col("block_timestamp").astype("string"))
    .withColumn("block_number", F.col("block_number").astype("string"))
    .withColumn("block_hash", F.col("block_hash").astype("string"))
)
df.show()

+--------------------+--------+-------------------+--------+--------------------+-------------------+------------+--------------------+
|             address|  symbol|               name|decimals|        total_supply|    block_timestamp|block_number|          block_hash|
+--------------------+--------+-------------------+--------+--------------------+-------------------+------------+--------------------+
|0x06f129ba7797790...| ROBODOG|A Dog Made of Metal|       2|10000000000000000...|2024-10-16 23:43:59|    20981480|0x658f5dee851f142...|
|0xacb86779718b20b...| ROBODOG|A Dog Made of Metal|       2|10000000000000000...|2024-10-16 23:33:59|    20981430|0x80a02a1dab761c5...|
|0xf3bdecf96f810a7...|ROBOShib|A Dog Made of Steel|       2|          1000000000|2024-10-16 23:47:59|    20981500|0xa44b0553ddb28cb...|
|0x6289ee8ad48cadd...|        |                   |       8|                   0|2024-10-16 04:31:59|    20975753|0x2e16805ce0c3480...|
+--------------------+--------+-----------------

In [8]:
# Conexão com o PostgreSQL
postgres_url = "jdbc:postgresql://10.106.35.153:5432/crypto_ethereum"
postgres_properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

# Escreve os dados no PostgreSQL
df.write.jdbc(url=postgres_url, table="public.crypto_tokens", mode="append", properties=postgres_properties)