In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
from pyspark.sql.types import *
import  pyspark.sql.functions as F
import snowflake.connector
from datetime import datetime

In [0]:
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

In [0]:
# The applied options are for CSV files. For other file types, these will be ignored.
df_procedures = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/procedures.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_supplies = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/supplies.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_careplans = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/careplans.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_devices = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/devices.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_payer_transitions = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/payer_transitions.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_providers = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/providers.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_allergies = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/allergies.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_conditions = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/conditions.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_encounters = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/encounters.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_immunizations = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/immunizations.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_medications = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/medications.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_organizations = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/organizations.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_patients = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/patients.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_payers = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/payers.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_imaging_studies = (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/imaging_studies.csv")
)
# The applied options are for CSV files. For other file types, these will be ignored.
df_observations= (
    spark.read.format("csv")
    .option("inferSchema", infer_schema)
    .option("header", first_row_is_header)
    .option("sep", delimiter)
    .load("/FileStore/tables/observations.csv")
)

In [0]:
class SnowflakePipeline:
    def __init__(self, account, user, password, warehouse, database, schema):
        self.account = account
        self.user = user
        self.password = password
        self.warehouse = warehouse
        self.database = database
        self.schema = schema
        self.connection = None
        try:
            self.connection = snowflake.connector.connect(
                account=self.account,
                user=self.user,
                password=self.password,
                warehouse=self.warehouse,
                database=self.database,
                schema=self.schema,
            )
            print("Conecção feita com sucesso!")
        except Exception as i:
            print(f"Erro de conecção: {str(i)}")
            self.connection = None
    def read_table(self, table_name, query=None, Full=False):
            if not self.connection:
                self.connect_to_snowflake()  # You need to define this method
            try:
                read_options = {
                    "sfURL": self.account + ".snowflakecomputing.com",
                    "user": self.user,
                    "password": self.password,
                    "warehouse": self.warehouse,
                    "database": self.database,
                    "schema": self.schema,
                }
                if not Full:
                    read_options["dbtable"] = table_name  # Corrected variable name
                else:
                    read_options["query"] = query
                df = spark.read.format("snowflake").options(**read_options).load()
                return df
            except Exception as i:
                print("Erro ao ler tabela:", i)
                return None
    def write_table(self, df, table_name):
        if self.connection:
            try:
                start_time = datetime.now()
                df.write.format("snowflake").option(
                    "sfURL", self.account + ".snowflakecomputing.com"
                ).option("sfUser", self.user).option(
                    "sfPassword", self.password
                ).option(
                    "sfDatabase", self.database
                ).option(
                    "sfSchema", self.schema
                ).option(
                    "sfWarehouse", self.warehouse
                ).option(
                    "dbtable", table_name
                ).mode(
                    "overwrite"
                ).save()


                end_time = datetime.now()
                elapsed_time = end_time - start_time
                

                cursor = self.connection.cursor()
                cursor.execute(f"DESCRIBE TABLE {table_name}")
                table_columns = cursor.fetchall()
                num_columns = len(table_columns)
                cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
                num_rows = cursor.fetchone()[0]
                cursor.close()
                return {
                    "time_to_create_table": str(elapsed_time),
                    "schema": self.schema,
                    "table_name": table_name,
                    "num_columns": num_columns,
                    "num_rows": num_rows,
                }
            except Exception as i:
                return f"Erro a criar tabela: {str(i)}"

In [0]:
snowflake_handler = SnowflakePipeline(
    account='eirljxx-tm85236',
    user='JOAOBRASIL',
    password='Brasil5+',
    warehouse='COMPUTE_WH',
    database='TRABALHO_FINAL',
    schema='TF_PART_1',
    )

Conecção feita com sucesso!


In [0]:
snowflake_handler.write_table(df_observations, 'df_observations')

Out[39]: {'time_to_create_table': '0:00:48.956178',
 'schema': 'TF_PART_1',
 'table_name': 'df_observations',
 'num_columns': 8,
 'num_rows': 1659750}

In [0]:
snowflake_handler.write_table(df_patients, 'df_patients')

Out[40]: {'time_to_create_table': '0:00:09.357874',
 'schema': 'TF_PART_1',
 'table_name': 'df_patients',
 'num_columns': 25,
 'num_rows': 12352}

In [0]:
snowflake_handler.write_table(df_conditions, 'df_conditions')

Out[41]: {'time_to_create_table': '0:00:10.158682',
 'schema': 'TF_PART_1',
 'table_name': 'df_conditions',
 'num_columns': 6,
 'num_rows': 114544}