In [None]:
import json
import logging
from pathlib import Path
from typing import Optional, Dict
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from pydantic import BaseModel, field_validator

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger("funnel_pipeline")
logger

<Logger funnel_pipeline (INFO)>

In [3]:
# --------------------------- Reader ---------------------------
class DataReader:
    def read(self) -> pd.DataFrame:
        raise NotImplementedError

class ParquetReader(DataReader):
    def __init__(self, file_path: str):
        self.file_path = Path(file_path)

    def read(self) -> pd.DataFrame:
        path_str = str(self.file_path)
        logger.info(f"Reading parquet from: {path_str}")

        #dosya okuma
        try:
            df = pd.read_parquet(path_str)
        except Exception as e:
            logger.exception("Failed to read parquet with pandas.read_parquet, attempting pyarrow.Dataset")    
        return df

In [5]:
# --------------------------- Models ---------------------------
class UserModel(BaseModel):
    user_id: Optional[int]
    subscriber_id: Optional[int]
    country: Optional[str]
    has_email_contact_permission: Optional[bool]
    has_phone_contact_permission: Optional[bool]

    @field_validator("has_email_contact_permission", "has_phone_contact_permission", mode="before")
    def to_bool(cls, v):
        if v is None:
            return None
        if isinstance(v, str):
            return v.lower() in ["yes", "true", "1", "y"]
        return bool(v)

class SessionModel(BaseModel):
    session_id: str
    user_id: Optional[int]
    user_agent: Optional[str]
    device_type: Optional[str]
    ip_address: Optional[str]
    utm_source: Optional[str]

class EventModel(BaseModel):
    request_id: str
    session_id: str
    funnel_id: Optional[str]
    timestamp: datetime
    page_name: Optional[str]
    search_query: Optional[str]
    destination_id: Optional[int]
    num_guests: Optional[int]
    updated_at: Optional[datetime] = None

    @field_validator('timestamp', mode='before')
    def parse_ts(cls, v):
        if v is None:
            raise ValueError('timestamp is required')
        if isinstance(v, str):
            return datetime.fromisoformat(v)
        if isinstance(v, (int, float)):
            # assume epoch
            return datetime.fromtimestamp(v)
        return v

class HotelModel(BaseModel):
    hotel_id: int
    hotel_price: Optional[float]
    currency: Optional[str]

    @field_validator("hotel_price", mode="before")
    def clean_price(cls, v):
        if v is None:
            return None
        if isinstance(v, str):
            v = v.replace(",", ".").replace("$", "").strip()
        try:
            return float(v)
        except Exception:
            return None

class PaymentModel(BaseModel):
    request_id: str
    payment_status: Optional[str]
    confirmation_number: Optional[str]

    @field_validator("payment_status", mode="before")
    def normalize_status(cls, v):
        if v is None:
            return None
        v = v.strip().lower()
        mapping = {"success": "completed", "done": "completed", "ok": "completed", "paid": "completed", "fail": "failed", "error": "failed"}
        return mapping.get(v, v)

In [6]:
# --------------------------- Processor ---------------------------
class FunnelProcessor:
    def validate_and_normalize(self, df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
        users, sessions, events, hotels, payments = [], [], [], [], []
        records = df.to_dict(orient='records')
        for i, row in enumerate(records):
            try:
                # For models that require fields, guard with try except per model so one bad model doesn't drop all
                try:
                    u = UserModel(**row).model_dump(exclude_none=True)
                    u and users.append(u)
                except Exception as e:
                    logger.debug(f"UserModel validation failed for row {i}: {e}")

                try:
                    s = SessionModel(**row).model_dump(exclude_none=True)
                    s and sessions.append(s)
                except Exception as e:
                    logger.debug(f"SessionModel validation failed for row {i}: {e}")

                try:
                    ev = EventModel(**row).model_dump(exclude_none=True)
                    ev and events.append(ev)
                except Exception as e:
                    logger.warning(f"EventModel validation failed for row {i}: {e}")

                try:
                    h = HotelModel(**row).model_dump(exclude_none=True)
                    h and hotels.append(h)
                except Exception:
                    pass

                try:
                    p = PaymentModel(**row).model_dump(exclude_none=True)
                    p and payments.append(p)
                except Exception:
                    pass

            except Exception as e:
                logger.exception(f"Unexpected error validating row {i}: {e}")

        def df_from_list(lst):
            return pd.DataFrame(lst) if len(lst) else pd.DataFrame()

        users_df = df_from_list(users)
        sessions_df = df_from_list(sessions)
        events_df = df_from_list(events)
        hotels_df = df_from_list(hotels)
        payments_df = df_from_list(payments)

        # dedupe where appropriate
        if not users_df.empty and 'user_id' in users_df.columns:
            users_df = users_df.drop_duplicates(subset=['user_id'])
        if not sessions_df.empty and 'session_id' in sessions_df.columns:
            sessions_df = sessions_df.drop_duplicates(subset=['session_id'])
        if not hotels_df.empty and 'hotel_id' in hotels_df.columns:
            hotels_df = hotels_df.drop_duplicates(subset=['hotel_id'])

        return {
            'users': users_df,
            'sessions': sessions_df,
            'events': events_df,
            'hotels': hotels_df,
            'payments': payments_df
        }

In [7]:
# --------------------------- Data Quality ---------------------------
class DataQuality:
    @staticmethod
    def null_check(df: pd.DataFrame, cols: list):
        missing = {c: int(df[c].isna().sum()) for c in cols if c in df.columns}
        total = len(df)
        for c, m in missing.items():
            if m > 0:
                logger.error(f"DQ Failed: {m}/{total} nulls in column {c}")
                return False
        return True

    @staticmethod
    def duplicate_check(df: pd.DataFrame, key_cols: list):
        if not set(key_cols).issubset(set(df.columns)):
            logger.warning("duplicate_check: key cols not in dataframe")
            return True
        dup = df.duplicated(subset=key_cols).sum()
        if dup > 0:
            logger.error(f"DQ Failed: {dup} duplicate rows for keys {key_cols}")
            return False
        return True

In [8]:
# --------------------------- Writer ---------------------------
class MySQLWriter:
    def __init__(self, config_path: str):

        p = Path(config_path)
        text = p.read_text(encoding="utf-8")
        data = json.loads(text)

        kullanici = data["kullanici"]
        sifre = data["sifre"]
        host = data["host"]
        port = data["port"]
        veritabani = data["veritabani"]

        conn_str = f"mysql+pymysql://{kullanici}:{sifre}@{host}:{port}/{veritabani}?charset=utf8mb4"
        self.engine = create_engine(conn_str, pool_pre_ping=True)
        logger.info("MySQL bağlantısı JSON dosyasından oluşturuldu.")

    def write_table(self, df: pd.DataFrame, table_name: str, if_exists="append"):
        if df.empty:
            logger.warning(f"{table_name} tablosu için yazılacak veri yok.")
            return

        df.to_sql(table_name, con=self.engine, index=False, if_exists=if_exists, method="multi")
        logger.info(f"{len(df)} satır {table_name} tablosuna yazıldı.")

In [9]:
# --------------------------------------- PIPELINE ---------------------------------------
class FunnelPipeline:
    def __init__(self, data_path: str, config_path: str):
        self.data_path = Path(data_path)
        self.config_path = config_path
        self.writer = MySQLWriter(config_path=config_path)

    def read_data(self) -> pd.DataFrame:
        """Parquet veya CSV dosyasını okur"""
        if self.data_path.suffix in [".parquet", ".gzip", ".cpgz"]:
            df = pd.read_parquet(self.data_path)
        else:
            raise ValueError("Desteklenmeyen dosya formatı.")
        logger.info(f"{len(df)} satır okundu.")
        return df

    def process_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Basit örnek: null satırları temizle"""
        before = len(df)
        df = df.dropna(how="all")
        after = len(df)
        logger.info(f"Veri temizlendi: {before - after} boş satır silindi.")
        return df

    def run(self, table_name="funnel_events"):
        df = self.read_data()
        df = self.process_data(df)
        self.writer.write_table(df, table_name=table_name, if_exists="append")

In [14]:
# ---------------------------------------  KULLANIM ÖRNEĞİ ---------------------------------------
if __name__ == "__main__":
    data_path = "//Users//sdedeoglu//Desktop//python//case_data.parquet.gzip"
    config_path = "//Users//sdedeoglu//Desktop//python//config.json"

    try:
        pipeline = FunnelPipeline(data_path=data_path, config_path=config_path)
        # Roll back any pending transactions before running
        with pipeline.writer.engine.connect() as conn:
            conn.invalidate() 
        pipeline.run(table_name="funnel_events")
    except Exception as e:
        logger.error(f"Pipeline execution failed: {e}")
        # If pipeline exists, try to rollback any pending transactions
        if 'pipeline' in locals():
            try:
                with pipeline.writer.engine.connect() as conn:
                    conn.rollback()
            except:
                pass

2025-10-18 21:27:44,373 INFO MySQL bağlantısı JSON dosyasından oluşturuldu.
2025-10-18 21:27:46,587 INFO 350690 satır okundu.
2025-10-18 21:27:46,709 INFO Veri temizlendi: 0 boş satır silindi.
2025-10-18 21:29:50,117 ERROR Pipeline execution failed: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)
