In [1]:
# ==============================================================================
# MASTER ETL SCRIPT (Python Automation) - Step 1 Yangilangan
# ==============================================================================
import requests
import pandas as pd
import os
import io
import json
import time
import pyodbc
import datetime
import numpy as np # Keyingi bosqichlar uchun kerak bo'lishi mumkin
import re          # Keyingi bosqichlar uchun kerak bo'lishi mumkin

print("--- MASTER ETL SCRIPT --- START ---")
start_time_total = datetime.datetime.now()

# --- Global Konfiguratsiyalar ---
# GitHub
REPO_RAW_BASE = "https://raw.githubusercontent.com/odilbekmarimov/DemoProject/main/files_final"
COLUMN_MAP_URL = "https://raw.githubusercontent.com/odilbekmarimov/DemoProject/main/column_table_map.json"
# Fayl Mapping Konfiguratsiyasi KOD ICHIDA BO'LADI (Skript 1 logikasi bo'yicha)

# Papkalar
RAW_DIR = "raw_data"; DECODED_DIR = "decoded_data"; CLEANED_DIR = "cleaned_data"
os.makedirs(RAW_DIR, exist_ok=True); os.makedirs(DECODED_DIR, exist_ok=True); os.makedirs(CLEANED_DIR, exist_ok=True)

# SQL Server
DB_SERVER = 'WIN-A5I5TM5OKQQ\\SQLEXPRESS'; DB_DATABASE = 'BankingDB'; USE_TRUSTED_CONNECTION = True; driver = '{ODBC Driver 17 for SQL Server}'

# ... (Boshqa global sozlamalar kerak bo'lsa) ...


# --- Yordamchi Funksiyalar (SQL Type, Value Conversion - keyingi bosqichlar uchun) ---
def get_sql_type(dtype):
    if pd.api.types.is_integer_dtype(dtype): return 'BIGINT'
    elif pd.api.types.is_float_dtype(dtype): return 'FLOAT'
    elif pd.api.types.is_datetime64_any_dtype(dtype): return 'DATETIME2'
    elif pd.api.types.is_bool_dtype(dtype): return 'BIT'
    else: return 'NVARCHAR(MAX)'

def convert_value(item): # NumPy/Pandas ni Python ga o'girish
    if pd.isna(item): return None
    if isinstance(item, (np.integer)): return int(item)
    if isinstance(item, (np.floating)): return float(item)
    if isinstance(item, (np.bool_)): return bool(item)
    return item

# ==============================================================================
# BOSQICH 1: Ingestion, Decoding, Metadata (Siz yuborgan skript asosida)
# ==============================================================================
def run_step1_ingest_decode_metadata():
    print("\n--- Starting Step 1: Ingestion, Decoding & Metadata ---")
    # >>> Fayl Mapping Konfiguratsiyasi KOD ICHIDA <<<
    file_mapping_config = {
        't01.csv': {'logical_name': 'users', 'table_id': '01'},
        't02.csv': {'logical_name': 'cards', 'table_id': '02'},
        't03.csv': {'logical_name': 'transactions', 'table_id': '03'},
        't04.csv': {'logical_name': 'logs', 'table_id': '04'},
        't05.csv': {'logical_name': 'reports', 'table_id': '05'},
        't07.csv': {'logical_name': 'scheduled_payments', 'table_id': '07'},
    }
    dataframes_ingested = {} # Bu funksiya qaytaradigan natija
    column_map_data_from_json = None
    cnxn_meta = None; cursor_meta = None
    step1_success = True

    try:
        # Metadata ulanishi
        print("  [Meta] Setting up connection...")
        if USE_TRUSTED_CONNECTION: conn_str_meta = (f"DRIVER={driver};SERVER={DB_SERVER};DATABASE={DB_DATABASE};Trusted_Connection=yes;")
        else: pass # SQL Auth
        cnxn_meta = pyodbc.connect(conn_str_meta, autocommit=False)
        cursor_meta = cnxn_meta.cursor(); print("  [Meta] Connection established.")
        create_meta_sql = "IF OBJECT_ID('dbo.retrieveinfo', 'U') IS NULL BEGIN CREATE TABLE dbo.retrieveinfo ( retrieve_id INT IDENTITY(1,1) PRIMARY KEY, source_file NVARCHAR(100) NOT NULL, retrieved_at DATETIME2 NOT NULL, total_rows INT NULL, processed_rows INT NULL, errors INT DEFAULT 0, notes NVARCHAR(MAX) NULL ); PRINT '  [Meta] Table retrieveinfo created.'; END ELSE BEGIN PRINT '  [Meta] Table retrieveinfo already exists.'; END"
        cursor_meta.execute(create_meta_sql); cnxn_meta.commit(); print("  [Meta] Table check/creation completed.")

        # Ustun Nomlari Xaritasini Yuklash
        print(f"  [Config] Loading column map from: {COLUMN_MAP_URL}")
        resp_col_map = requests.get(COLUMN_MAP_URL)
        resp_col_map.raise_for_status()
        column_map_data_from_json = resp_col_map.json()
        if not isinstance(column_map_data_from_json, dict) or not column_map_data_from_json:
            raise ValueError("Column map JSON is invalid or empty.")
        print("  [Config] Column mapping loaded successfully.")

        # Fayllarni Yuklash, Dekodlash, Metadata Yozish
        print("\n  [Main] Starting file ingestion and decoding loop (using in-code config)...")
        for filename, info in file_mapping_config.items():
            logical_name = info.get('logical_name')
            table_id = info.get('table_id')
            file_url = f"{REPO_RAW_BASE}/{filename}"
            retrieval_start_time = datetime.datetime.now()
            total_rows_read, processed_rows_count = None, None
            error_flag, error_notes = 0, None
            df = pd.DataFrame()

            print(f"\n    >>> Processing: {filename} (Logical: {logical_name}, TableID: {table_id})")
            try:
                print(f"      Downloading from {file_url}...");
                resp_csv = requests.get(file_url); resp_csv.raise_for_status(); print("      Download complete.")
                print("      Reading CSV...");
                try: df = pd.read_csv(io.StringIO(resp_csv.text))
                except UnicodeDecodeError: df = pd.read_csv(io.StringIO(resp_csv.content.decode('latin1')))
                total_rows_read = len(df); print(f"      Read complete: {total_rows_read} rows.")
                raw_path = os.path.join(RAW_DIR, filename); print(f"      Saving raw data to {raw_path}...");
                with open(raw_path, 'wb') as f: f.write(resp_csv.content); print("      Raw data saved.")

                print("      Decoding columns...");
                table_specific_map = column_map_data_from_json.get(table_id, {}).get("columns", {})
                if not table_specific_map:
                    decoded_columns = df.columns.tolist(); print(f"        Warning: No mapping for table_id '{table_id}'. Keeping original names.")
                else:
                    decoded_columns = [table_specific_map.get(col.split("-")[-1], col) for col in df.columns]; df.columns = decoded_columns;
                    print(f"        Columns decoded. Example: {decoded_columns[:5]}...")
                processed_rows_count = len(df); print("      Decoding complete.")

                print(f"      Storing '{logical_name}' in memory."); dataframes_ingested[logical_name] = df
                decoded_path = os.path.join(DECODED_DIR, f"decoded_{filename}"); print(f"      Saving decoded data to {decoded_path}...");
                df.to_csv(decoded_path, index=False, encoding='utf-8'); print("      Decoded data saved.")
                error_flag = 0; error_notes = "Success"; print(f"      >>> Successfully processed {filename}.")

            except Exception as e:
                error_flag=1; error_notes=f"Error with {filename}: {e}"; print(f"      ERROR: {error_notes}"); dataframes_ingested[logical_name]=pd.DataFrame()
                step1_success = False # Agar birorta fayl xato bersa, umumiy statusni false qilamiz

            # Metadata yozish
            if cursor_meta and cnxn_meta:
                print(f"      Logging metadata for {filename}...");
                try:
                    sql_meta="INSERT INTO retrieveinfo (source_file, retrieved_at, total_rows, processed_rows, errors, notes) VALUES (?, ?, ?, ?, ?, ?)"
                    meta_tuple=(filename, retrieval_start_time, total_rows_read, processed_rows_count, error_flag, str(error_notes)[:4000])
                    cursor_meta.execute(sql_meta, meta_tuple); cnxn_meta.commit(); print("      Metadata logged.")
                except Exception as meta_ex:
                    print(f"      ERROR logging metadata for {filename}: {meta_ex}"); cnxn_meta.rollback()
                    step1_success = False # Metadata yozishda xato bo'lsa ham
            else:
                print("      Skipping metadata logging (DB connection not available for this file).")
            time.sleep(0.1)
        print("\n  [Main] Ingestion and decoding loop finished.")

    except Exception as e:
        print(f"  ERROR in Step 1 main process: {e}")
        step1_success = False
    finally:
        if cnxn_meta:
            if cursor_meta: cursor_meta.close()
            cnxn_meta.close(); print("  [Meta] Connection closed.")
        print(f"--- Step 1 Finished. Overall Success: {step1_success} ---")
    return dataframes_ingested if step1_success else None


# ==============================================================================
# BOSQICH 2: Data Cleaning (Placeholder - Buni ham to'ldirishingiz kerak)
# ==============================================================================
def run_step2_cleaning(dataframes_to_clean):
    print("\n--- Starting Step 2: Data Cleaning ---")
    if dataframes_to_clean is None:
        print("  ERROR: No dataframes from Step 1. Skipping cleaning.")
        return None
    cleaned_dataframes_output = {}
    step2_success = True
    # --- Tozalash Sozlamalari ---
    TRANSACTION_AMOUNT_THRESHOLD_CLEAN = 15000 # Nomenklaturani o'zgartirdim
    EMAIL_REGEX_CLEAN = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    PHONE_REGEX_CLEAN = r"^(?=.*\d{6,})[\d\s\+\-\(\)]+$"

    # --- Yordamchi Funksiyalar (Tozalash uchun) ---
    def safe_to_datetime(series, **kwargs): return pd.to_datetime(series, errors='coerce', **kwargs)
    def safe_to_numeric(series, **kwargs): return pd.to_numeric(series, errors='coerce', **kwargs)
    def safe_to_boolean(series):
        if series.dtype == 'bool': return series
        map_dict = {'true':True,'false':False,'1':True,'0':False,1:True,0:False,'yes':True,'no':False}
        return series.apply(lambda x: str(x).lower() if pd.notna(x) else x).map(map_dict).fillna(False).astype(bool)
    def safe_to_int64(series):
        numeric_series = safe_to_numeric(series)
        try: return numeric_series.astype('Int64')
        except: return numeric_series

    def clean_dataframe_generic(df_orig, logical_name_clean): # Umumiy tozalash
        print(f"    Cleaning generic: {logical_name_clean}, Original shape: {df_orig.shape if df_orig is not None else 'None'}")
        if df_orig is None or df_orig.empty: return pd.DataFrame()
        df = df_orig.copy()
        for col in df.select_dtypes(include=['object']).columns:
            if col in df.columns and pd.api.types.is_string_dtype(df[col]):
                try: df[col] = df[col].str.strip()
                except: pass
        return df

    # ... (Sizning clean_users, clean_cards, ... funksiyalaringizni SHU YERGA KO'CHIRING) ...
    # Funksiyalar ichidagi ustun nomlari Skript 1 dan keladigan 'id', 'name' kabi
    # dekodlangan nomlarga mos bo'lishi kerak.

    print("    PLACEHOLDER: Data cleaning functions need to be fully implemented here from your Script 2.")
    # Misol uchun, har bir DataFrame ni shunchaki nusxalab o'tkazamiz:
    for name, df_orig_clean in dataframes_to_clean.items():
        print(f"    Cleaning (placeholder) for: {name}")
        # cleaned_df = your_specific_cleaning_function(df_orig_clean, other_data_if_needed)
        cleaned_df = df_orig_clean.copy() # Hozircha
        cleaned_dataframes_output[name] = cleaned_df
        # Tozalangan faylni saqlash
        os.makedirs(CLEANED_DIR, exist_ok=True)
        save_path = os.path.join(CLEANED_DIR, f"cleaned_{name}.csv")
        cleaned_df.to_csv(save_path, index=False, encoding='utf-8')
        print(f"      Saved cleaned (placeholder) data for {name} to {save_path}")


    print(f"--- Step 2 Finished. Success: {step2_success} ---") # Xatolik boshqaruvini qo'shish kerak
    return cleaned_dataframes_output # Yoki xato bo'lsa None

# ==============================================================================
# BOSQICH 3: Load to SQL (Placeholder)
# ==============================================================================
def run_step3_load_to_sql(data_to_load):
    print("\n--- Starting Step 3: Load to SQL Server ---")
    if data_to_load is None:
        print("  ERROR: No cleaned data. Skipping SQL load.")
        return False
    step3_success = True
    # ... (Sizning SQLga yuklash, PK/FK qo'shish uchun Skript 3 logikangizni SHU YERGA KO'CHIRING) ...
    # NumPy konvertatsiyasi bilan oxirgi ishlagan versiyani oling.
    # Har bir jadvalni alohida yuklash yondashuvini ishlatishingiz mumkin.
    print("    PLACEHOLDER: SQL Loading logic needs to be implemented here from your Script 3.")
    print(f"--- Step 3 Finished. Success: {step3_success} ---")
    return step3_success

# ==============================================================================
# BOSQICH 5: SQL Transformations (Create Views - Placeholder)
# ==============================================================================
def run_step5_sql_views():
    print("\n--- Starting Step 5: Create SQL Views ---")
    step5_success = True
    # ... (Sizning VIEW yaratish SQL skriptingizni pyodbc orqali bajarish logikasi SHU YERGA) ...
    print("    PLACEHOLDER: SQL View creation logic needs to be implemented here.")
    print(f"--- Step 5 Finished. Success: {step5_success} ---")
    return step5_success

# ==============================================================================
# ASOSIY ISH OQIMI
# ==============================================================================
if __name__ == "__main__":
    print(">>> MASTER ETL SCRIPT: Starting Banking ETL Process <<<")

    # Bosqich 1
    ingested_data = run_step1_ingest_decode_metadata()

    # Bosqich 2
    cleaned_data = None
    if ingested_data:
        cleaned_data = run_step2_cleaning(ingested_data)
    else:
        print("Skipping Step 2 due to errors or no data from Step 1.")

    # Bosqich 3
    load_sql_success = False
    if cleaned_data:
        load_sql_success = run_step3_load_to_sql(cleaned_data)
    else:
        print("Skipping Step 3 due to errors or no data from Step 2.")

    # Bosqich 5
    views_success = False
    if load_sql_success:
        views_success = run_step5_sql_views()
    else:
        print("Skipping Step 5 due to errors in Step 3.")

    end_time_total = datetime.datetime.now()
    print(f"\n>>> Banking ETL Process Finished in {end_time_total - start_time_total} <<<")

    if ingested_data and cleaned_data and load_sql_success and views_success:
        print("All ETL steps reported success!")
    else:
        print("ETL Process completed with one or more reported errors. Please review logs.")

--- MASTER ETL SCRIPT --- START ---
>>> MASTER ETL SCRIPT: Starting Banking ETL Process <<<

--- Starting Step 1: Ingestion, Decoding & Metadata ---
  [Meta] Setting up connection...
  [Meta] Connection established.
  [Meta] Table check/creation completed.
  [Config] Loading column map from: https://raw.githubusercontent.com/odilbekmarimov/DemoProject/main/column_table_map.json
  [Config] Column mapping loaded successfully.

  [Main] Starting file ingestion and decoding loop (using in-code config)...

    >>> Processing: t01.csv (Logical: users, TableID: 01)
      Downloading from https://raw.githubusercontent.com/odilbekmarimov/DemoProject/main/files_final/t01.csv...
      Download complete.
      Reading CSV...
      Read complete: 600 rows.
      Saving raw data to raw_data\t01.csv...
      Raw data saved.
      Decoding columns...
        Columns decoded. Example: ['id', 'name', 'phone_number', 'email', 'created_at']...
      Decoding complete.
      Storing 'users' in memory.
    