In [6]:
# Spark Schema Generator from PostgreSQL Table Schema

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType, BooleanType, TimestampType, DateType, DecimalType, ArrayType, MapType, BinaryType

import psycopg2

# Connect to the PostgreSQL database
conn = psycopg2.connect(host="localhost", database="dev_psg", user="admin", password="admin")

# Create a cursor object
cur = conn.cursor()

# Define the PostgreSQL schema name
schema_name = "public"

# Get the list of tables in the specified schema
cur.execute(f"SELECT table_name FROM information_schema.tables WHERE table_schema='{schema_name}'")
tables = cur.fetchall()

In [4]:

# Define a dictionary to map PostgreSQL data types to Spark data types
data_type_mapping = {
    "ARRAY": ArrayType(StringType()), # assuming array contains string elements
    "bigint": LongType(),
    "boolean": BooleanType(),
    "bytea": BinaryType(),
    "character": StringType(),
    "character varying": StringType(),
    "date": DateType(),
    "double precision": FloatType(),
    "integer": IntegerType(),
    "json": StringType(),
    "name": StringType(),
    "numeric": DecimalType(),
    "oid": LongType(),
    "smallint": IntegerType(),
    "text": StringType(),
    "timestamp with time zone": TimestampType(),
    "timestamp without time zone": TimestampType()
    #Add more data type conversion
}

In [None]:

# Define a function to get the StructType schema for a PostgreSQL table
def get_table_schema(table_name):
    # Get the table schema from PostgreSQL
    cur.execute(f"SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name='{table_name}'")
    rows = cur.fetchall()

    # Create a list of StructFields from the table schema
    fields = []
    for row in rows:
        field_name = row[0]
        field_type = row[1]
        is_nullable = row[2]
        spark_data_type = data_type_mapping.get(field_type, StringType())
        
        # Handle special cases
        if field_type == "numeric":
            # Get the precision and scale from the PostgreSQL column definition
            cur.execute(f"SELECT numeric_precision, numeric_scale FROM information_schema.columns WHERE table_name='{table_name}' AND column_name='{field_name}'")
            precision, scale = cur.fetchone()
            # Create a DecimalType with the specified precision and scale
            spark_data_type = DecimalType(precision, scale)
        elif field_type.startswith("array"):
            # Get the element data type from the PostgreSQL column definition
            element_type = field_type[6:-1]
            element_spark_data_type = data_type_mapping.get(element_type, StringType())
            # Create an ArrayType with the element data type
            spark_data_type = ArrayType(element_spark_data_type)

        fields.append(StructField(field_name, spark_data_type, is_nullable == 'NO'))

    # Create a StructType schema from the list of StructFields
    return StructType(fields)

# Create a dictionary to store the table schemas for each table
schemas = {}
for table in tables:
    table_name = table[0]
    schemas[table_name] = get_table_schema(table_name)

# Print the schemas
for table_name, schema in schemas.items():
    print(f"Schema for {table_name}:")
    print(schema)
    print()

#Sample Output for the Schema and tables

Schema for _airbyte_raw_fd_business_hours:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))

Schema for _airbyte_raw_fd_canned_response_folders:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))

Schema for _airbyte_raw_fd_canned_responses:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))

Schema for _airbyte_raw_fd_companies:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))

Schema for _airbyte_raw_fd_tickets:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))

Schema for fd_sla_policies_esca__ion_resolution_level2:
StructType(List(StructField(_airbyte_resolution_hashid,StringType,false),StructField(agent_ids,LongType,false),StructField(escalation_time,LongType,false),StructField(_airbyte_ab_id,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false),StructField(_airbyte_normalized_at,TimestampType,false),StructField(_airbyte_level2_hashid,StringType,false)))

Schema for _airbyte_tmp_bts_fd_agents:
StructType(List(StructField(_airbyte_ab_id,StringType,true),StructField(_airbyte_data,StringType,false),StructField(_airbyte_emitted_at,TimestampType,false)))
