In [None]:
!pip install snowflake-connector-python



In [None]:
import snowflake.connector
import pandas as pd
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

In [None]:
# Snowflake connection parameters
SNOWFLAKE_ACCOUNT = 'bw02678.ap-southeast-1'
SNOWFLAKE_USER = 'SANTHOSHKRISH03'
SNOWFLAKE_PASSWORD = 'Santhosh_2003.'
SNOWFLAKE_WAREHOUSE = 'MUSIC_ANALYTICS_WH'
SNOWFLAKE_DATABASE = 'MUSIC_ANALYTICS'
SNOWFLAKE_SCHEMA = 'RAW_DATA'

In [None]:
def connect_to_snowflake():
    return snowflake.connector.connect(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA
    )

In [None]:
def execute_query(conn, query, params=None):
    cursor = conn.cursor()
    cursor.execute(query, params)
    conn.commit()
    cursor.close()

In [None]:
def create_mdm_tables(conn):
    queries = [
        """
        CREATE TABLE IF NOT EXISTS master_artists (
            artist_id VARCHAR(36) DEFAULT UUID_STRING(),
            artist_name VARCHAR(255) NOT NULL,
            spotify_id VARCHAR(255),
            twitter_handle VARCHAR(255),
            created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            PRIMARY KEY (artist_id),
            UNIQUE (artist_name)
        )
        """,
        """
        CREATE TABLE IF NOT EXISTS master_songs (
            song_id VARCHAR(36) DEFAULT UUID_STRING(),
            song_name VARCHAR(255) NOT NULL,
            artist_id VARCHAR(36),
            album_name VARCHAR(255),
            spotify_id VARCHAR(255),
            created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            PRIMARY KEY (song_id),
            UNIQUE (spotify_id),
            FOREIGN KEY (artist_id) REFERENCES master_artists(artist_id)
        )
        """,
        """
        CREATE TABLE IF NOT EXISTS master_venues (
            venue_id VARCHAR(36) DEFAULT UUID_STRING(),
            venue_name VARCHAR(255) NOT NULL,
            location VARCHAR(255),
            created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
            PRIMARY KEY (venue_id),
            UNIQUE (venue_name)
        )
        """
    ]

    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(execute_query, conn, query) for query in queries]
        for future in as_completed(futures):
            future.result()

In [None]:
def batch_upsert_artists(conn, artists_data):
    query = """
    MERGE INTO master_artists AS target
    USING (
        SELECT column1 as artist_name, column2 as spotify_id, column3 as twitter_handle
        FROM VALUES (%s, %s, %s)
    ) AS source
    ON target.artist_name = source.artist_name
    WHEN MATCHED THEN
        UPDATE SET
            spotify_id = COALESCE(target.spotify_id, source.spotify_id),
            twitter_handle = COALESCE(target.twitter_handle, source.twitter_handle),
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (artist_name, spotify_id, twitter_handle)
        VALUES (source.artist_name, source.spotify_id, source.twitter_handle)
    """

    cursor = conn.cursor()
    cursor.executemany(query, artists_data)
    conn.commit()
    cursor.close()


In [None]:
def update_mdm_from_spotify(conn):
    cursor = conn.cursor()

    # Update artists (keep this part unchanged)
    cursor.execute("""
    MERGE INTO master_artists AS target
    USING (
        SELECT DISTINCT
            artists AS artist_name,
            FIRST_VALUE(track_id) OVER (PARTITION BY artists ORDER BY track_id) AS spotify_id
        FROM RAW_DATA.SPOTIFY_TRACKS
        WHERE artists IS NOT NULL
    ) AS source
    ON target.artist_name = source.artist_name
    WHEN MATCHED THEN
        UPDATE SET
            spotify_id = CASE
                WHEN target.spotify_id IS NULL OR target.spotify_id = 'UNKNOWN'
                THEN source.spotify_id
                ELSE target.spotify_id
            END,
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (artist_name, spotify_id)
        VALUES (source.artist_name, source.spotify_id)
    """)

    # Update songs (keep this part unchanged)
    cursor.execute("""
    MERGE INTO master_songs AS target
    USING (
        SELECT DISTINCT
            COALESCE(s.track_name, 'UNKNOWN') AS song_name,
            a.artist_id,
            COALESCE(s.album_name, 'UNKNOWN') AS album_name,
            s.track_id AS spotify_id
        FROM RAW_DATA.SPOTIFY_TRACKS s
        JOIN master_artists a ON a.artist_name = s.artists
        WHERE s.track_name IS NOT NULL AND s.artists IS NOT NULL
        QUALIFY ROW_NUMBER() OVER (PARTITION BY s.track_id ORDER BY s.track_name) = 1
    ) AS source
    ON target.spotify_id = source.spotify_id
    WHEN MATCHED THEN
        UPDATE SET
            song_name = CASE WHEN target.song_name = 'UNKNOWN' THEN source.song_name ELSE target.song_name END,
            artist_id = source.artist_id,
            album_name = CASE WHEN target.album_name = 'UNKNOWN' THEN source.album_name ELSE target.album_name END,
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (song_name, artist_id, album_name, spotify_id)
        VALUES (source.song_name, source.artist_id, source.album_name, source.spotify_id)
    """)

    # Update venues (corrected to use the LOCATION field)
    cursor.execute("""
    MERGE INTO master_venues AS target
    USING (
        SELECT DISTINCT
            Venue AS venue_name,
            Location AS location
        FROM RAW_DATA.CONCERTS
        WHERE Venue IS NOT NULL
    ) AS source
    ON target.venue_name = source.venue_name
    WHEN MATCHED THEN
        UPDATE SET
            location = CASE
                WHEN target.location IS NULL OR target.location = 'UNKNOWN'
                THEN source.location
                ELSE target.location
            END,
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (venue_name, location)
        VALUES (source.venue_name, source.location)
    """)

    conn.commit()
    cursor.close()

In [None]:
def create_data_quality_table(conn):
    query = """
    CREATE TABLE IF NOT EXISTS data_quality_issues (
        issue_id VARCHAR(36) DEFAULT UUID_STRING(),
        source_table VARCHAR(255),
        column_name VARCHAR(255),
        issue_type VARCHAR(255),
        issue_description VARCHAR(1000),
        record_count INT,
        created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
        PRIMARY KEY (issue_id)
    )
    """
    execute_query(conn, query)


In [None]:
def profile_data(conn, table_name: str):
    cursor = conn.cursor()
    cursor.execute(f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = UPPER('{table_name}')")
    columns = [row[0] for row in cursor.fetchall()]
    cursor.close()

    queries = []
    for column_name in columns:
        queries.append(f"""
        INSERT INTO data_quality_issues (source_table, column_name, issue_type, issue_description, record_count)
        SELECT '{table_name}', '{column_name}', 'NULL_VALUES', 'Column contains null values', COUNT(*)
        FROM {table_name}
        WHERE {column_name} IS NULL
        HAVING COUNT(*) > 0
        """)

        queries.append(f"""
        INSERT INTO data_quality_issues (source_table, column_name, issue_type, issue_description, record_count)
        SELECT '{table_name}', '{column_name}', 'DUPLICATE_VALUES', 'Column contains duplicate values', COUNT(*) - COUNT(DISTINCT {column_name})
        FROM {table_name}
        HAVING COUNT(*) - COUNT(DISTINCT {column_name}) > 0
        """)

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(execute_query, conn, query) for query in queries]
        for future in as_completed(futures):
            future.result()

In [None]:
def profile_data_batch(conn, tables):
    cursor = conn.cursor()

    for table in tables:
        # Get the column names for the table
        cursor.execute(f"""
            SELECT COLUMN_NAME
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_NAME = '{table.split('.')[-1]}'
            AND TABLE_SCHEMA = '{table.split('.')[0]}'
        """)
        columns = [row[0] for row in cursor.fetchall()]

        # Create individual queries for each column
        for column in columns:
            # Check for NULL values
            null_query = f"""
            INSERT INTO data_quality_issues (source_table, column_name, issue_type, issue_description, record_count)
            SELECT
                '{table}' as source_table,
                '{column}' as column_name,
                'NULL_VALUES' as issue_type,
                'Column contains null values' as issue_description,
                COUNT(*) as record_count
            FROM {table}
            WHERE {column} IS NULL
            HAVING COUNT(*) > 0
            """
            cursor.execute(null_query)

            # Check for duplicate values
            duplicate_query = f"""
            INSERT INTO data_quality_issues (source_table, column_name, issue_type, issue_description, record_count)
            SELECT
                '{table}' as source_table,
                '{column}' as column_name,
                'DUPLICATE_VALUES' as issue_type,
                'Column contains duplicate values' as issue_description,
                COUNT(*) - COUNT(DISTINCT {column}) as record_count
            FROM {table}
            HAVING COUNT(*) - COUNT(DISTINCT {column}) > 0
            """
            cursor.execute(duplicate_query)

    conn.commit()
    cursor.close()

In [None]:
def create_data_lineage_table(conn):
    query = """
    CREATE TABLE IF NOT EXISTS data_lineage (
        lineage_id VARCHAR(36) DEFAULT UUID_STRING(),
        source_table VARCHAR(255),
        source_column VARCHAR(255),
        target_table VARCHAR(255),
        target_column VARCHAR(255),
        transformation_description VARCHAR(1000),
        created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
        updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
        PRIMARY KEY (lineage_id)
    )
    """
    execute_query(conn, query)

In [None]:
def batch_update_data_lineage(conn, lineage_data):
    query = """
    MERGE INTO data_lineage AS target
    USING (
        SELECT
            column1 as source_table,
            column2 as source_column,
            column3 as target_table,
            column4 as target_column,
            column5 as transformation_description
        FROM VALUES (%s, %s, %s, %s, %s)
    ) AS source
    ON target.source_table = source.source_table
    AND target.source_column = source.source_column
    AND target.target_table = source.target_table
    AND target.target_column = source.target_column
    WHEN MATCHED THEN
        UPDATE SET
            transformation_description = source.transformation_description,
            updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (source_table, source_column, target_table, target_column, transformation_description)
        VALUES (source.source_table, source.source_column, source.target_table, source.target_column, source.transformation_description)
    """

    cursor = conn.cursor()
    cursor.executemany(query, lineage_data)
    conn.commit()
    cursor.close()

In [None]:
def perform_impact_analysis(conn, source_table: str, source_column: str):
    query = """
    WITH RECURSIVE impact_tree AS (
        SELECT target_table, target_column, transformation_description
        FROM data_lineage
        WHERE source_table = %s AND source_column = %s
        UNION ALL
        SELECT dl.target_table, dl.target_column, dl.transformation_description
        FROM data_lineage dl
        JOIN impact_tree it ON dl.source_table = it.target_table AND dl.source_column = it.target_column
    )
    SELECT DISTINCT * FROM impact_tree
    """

    cursor = conn.cursor()
    cursor.execute(query, (source_table, source_column))
    results = cursor.fetchall()
    cursor.close()
    return results

In [None]:
def main():
    start_time = time.time()
    conn = connect_to_snowflake()

    try:
        print("Creating MDM tables...")
        create_mdm_tables(conn)

        print("Updating MDM from Spotify...")
        update_mdm_from_spotify(conn)

        print("Creating data quality table...")
        create_data_quality_table(conn)

        print("Profiling data...")
        tables_to_profile = ['RAW_DATA.SPOTIFY_TRACKS', 'RAW_DATA.BILLBOARD_HOT_100', 'RAW_DATA.TWEETS', 'RAW_DATA.CONCERTS']
        profile_data_batch(conn, tables_to_profile)

        print("Creating data lineage table...")
        create_data_lineage_table(conn)

        print("Updating data lineage...")
        lineage_data = [
            ('RAW_DATA.SPOTIFY_TRACKS', 'artists', 'master_artists', 'artist_name', 'Direct mapping from Spotify tracks to master artists'),
            ('RAW_DATA.SPOTIFY_TRACKS', 'track_name', 'master_songs', 'song_name', 'Direct mapping from Spotify tracks to master songs'),
            ('RAW_DATA.BILLBOARD_HOT_100', 'artist', 'master_artists', 'artist_name', 'Direct mapping from Billboard to master artists'),
            ('RAW_DATA.BILLBOARD_HOT_100', 'song', 'master_songs', 'song_name', 'Direct mapping from Billboard to master songs'),
            ('RAW_DATA.TWEETS', 'Username', 'master_artists', 'twitter_handle', 'Potential mapping from Twitter username to artist'),
            ('RAW_DATA.CONCERTS', 'Main_Artist', 'master_artists', 'artist_name', 'Direct mapping from Concerts to master artists'),
            ('RAW_DATA.CONCERTS', 'Venue', 'master_venues', 'venue_name', 'Direct mapping from Concerts to master venues')
        ]
        batch_update_data_lineage(conn, lineage_data)

        print("Performing impact analysis...")
        impact_results = perform_impact_analysis(conn, 'RAW_DATA.SPOTIFY_TRACKS', 'artists')
        print("Impact analysis results for Spotify artists:")
        for result in impact_results:
            print(result)

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        conn.rollback()
    else:
        conn.commit()
    finally:
        conn.close()
        end_time = time.time()
        print(f"MDM implementation completed in {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
    main()

Creating MDM tables...
Updating MDM from Spotify...
Creating data quality table...
Profiling data...
Creating data lineage table...
Updating data lineage...
Performing impact analysis...
Impact analysis results for Spotify artists:
('master_artists', 'artist_name', 'Direct mapping from Spotify tracks to master artists')
MDM implementation completed in 65.35 seconds
