# Bigdata Engineering - analyzing willhaben data

## Scraping the data

Willhaben.at, as Austria’s largest classifieds website, is a rich source of data for market analysis, especially in real estate, automobiles, and second-hand goods. However, scraping Willhaben can be challenging due to dynamic content loading and bot protection. A robust approach involves using Scrapy for structured scraping logic, with Selenium integrated as middleware to render JavaScript and bypass basic anti-bot defenses.

### 🛠 Tools & Technologies

* **Scrapy**: Python-based web crawling framework for structured data extraction.

* **Selenium**: Browser automation tool to handle JavaScript-rendered pages.

* **Webdriver**: Firefox in headless mode for performance.

* **Apache Kafka**: Data streaming pipeline

### 🔁 Workflow Overview

* Scrapy handles crawl control and item pipelines.

* Selenium acts as a renderer for pages that require JS execution.

* Middleware intercepts requests needing rendering and returns fully loaded HTML to Scrapy.

* Data is extracted using Beautiful soup (XPath/CSS).

* After each extraction, the data is forwarded to the Kafka broker

### 🛡 Anti-Bot Considerations

* Random User Agents: Rotate user agents for each session.

* Delay & Throttle: Respect server load by implementing DOWNLOAD_DELAY.

* Proxy Pools: Use rotating proxies to avoid IP blocks.

### 📊 Use Cases

* Real Estate: Track price changes and availability per district.

* Cars: Analyze price depreciation trends by brand and model.

* Consumer Goods: Detect high-demand used products.

# 🕷 Run the webscraper

The first step is to gather the ad urls and willhaben codes from a categorie of your choice.

`navigation_url` is the parameter used for iterating through the pages, where the lists of products are displayed.

In [None]:
import os

if not os.path.isfile('scrapy.cfg'):
    os.chdir('willhaben')

!scrapy crawl willhaben_urls -a navigation_url="https://www.willhaben.at/iad/kaufen-und-verkaufen/marktplatz/buecher/comics-mangas-2913"

A new scraping session begins with the creation of a `scrape_run_id`, which is used to reference and track the ads in the subsequent steps.

The second phase involves the `willhaben_items` scraper, which retrieves the URLs associated with a given scrape run, sends requests to each ad, and extracts data based on the structure defined in the `WillhabenItem` class.

In [None]:
import os

if not os.path.isfile('scrapy.cfg'):
    os.chdir('willhaben')

!scrapy crawl willhaben_items -a scrape_run_id=13

## Example: Analyzing cars 🏎

In [None]:
scrape_run_id = 7

db_params = {
    'dbname': 'scraped',
    'user': 'scraped',
    'password': 'scraped',
    'host': '127.0.0.1',
    'port': '5432'
}

### Average Mileage vs. Registration Year

This script connects to a PostgreSQL database to retrieve car listing data, specifically the registration date (Erstzulassung) and mileage (Kilometerstand) from a JSONB field. After fetching the data related to a specific scrape run, it performs data cleaning and transformation—parsing dates, converting mileage to numeric values, and filtering out invalid entries. The cleaned data is then grouped by registration year, and the average mileage is calculated for each year. Finally, the processed data is visualized in a line chart, showing trends over time.

In [None]:
import psycopg2
from psycopg2 import Error
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

try:
    # Establish connection to PostgreSQL database
    
    connection = psycopg2.connect(**db_params)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query
    query = f"""
    SELECT 
        data->'Basisdaten'->>'Erstzulassung' AS Erstzulassung,
        data->'Basisdaten'->>'Kilometerstand' AS Kilometerstand
    FROM willhaben_items
    WHERE 
        data->'Basisdaten'->>'Erstzulassung' IS NOT NULL
        AND data->'Basisdaten'->>'Kilometerstand' IS NOT NULL
        AND scrape_run_id = {scrape_run_id};
    """

    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()

    # Convert fetched rows to a pandas DataFrame
    df = pd.DataFrame(rows, columns=['Erstzulassung', 'Kilometerstand'])

    # Data preprocessing
    # Convert Erstzulassung to datetime (assuming format like "4/2018")
    def parse_date(date_str):
        try:
            return datetime.strptime(date_str, "%m/%Y")
        except ValueError:
            return None  # Handle invalid dates if any

    df['Erstzulassung'] = df['Erstzulassung'].apply(parse_date)

    # Clean Kilometerstand: remove 'km', replace '.' with '', and convert to numeric
    df['Kilometerstand'] = df['Kilometerstand'].str.replace(' km', '').str.replace('.', '').astype(float)

    # Drop rows with invalid dates or missing data
    df = df.dropna(subset=['Erstzulassung', 'Kilometerstand'])

    # Create buckets by extracting the year from Erstzulassung
    df['Year'] = df['Erstzulassung'].dt.year

    # Aggregate by year: calculate mean Kilometerstand for each year
    df_buckets = df.groupby('Year')['Kilometerstand'].mean().reset_index()

    # Create a line chart
    plt.figure(figsize=(20, 6))
    plt.plot(df_buckets['Year'], df_buckets['Kilometerstand'], marker='o', linestyle='-', color='b')
    plt.title('Average Mileage vs. Registration Year')
    plt.xlabel('Registration Year')
    plt.ylabel('Average Mileage')
    plt.grid(True)
    plt.xticks(df_buckets['Year'], rotation=90)  # Ensure years are shown as integers
    plt.tight_layout()

    # Show the plot
    plt.show()

except (Exception, Error) as error:
    print("Error while connecting to PostgreSQL:", error)

finally:
    # Close database connection
    if connection:
        cursor.close()
        connection.close()

### Percentage of Car Colors by Brand

This script connects to a PostgreSQL database to extract car listing data, focusing on exterior color (Außenfarbe) and brand information derived from the breadcrumbs field. After filtering for valid entries, it constructs brand-color pairs and limits the dataset to the most common brands and colors. A pivot table is then created to count how often each color appears per brand. This table is converted into percentages to normalize the data for comparison. Finally, a stacked bar chart is generated to visualize the color distribution across the top car brands, with custom color mapping to match the actual color names in German.

In [None]:
%matplotlib inline
import psycopg2
from psycopg2 import Error
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter

try:
    # Establish connection to PostgreSQL database
    connection = psycopg2.connect(**db_params)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query to extract Außenfarbe and breadcrumbs
    query = f"""
    SELECT 
        data->'Karosserie & Technik'->>'Außenfarbe' AS color,
        breadcrumbs
    FROM willhaben_items
    WHERE scrape_run_id = {scrape_run_id};
    """

    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()

    # Process the colors and brands: collect color-brand pairs
    data = []
    total_valid_records = 0

    for row in rows:
        color = row[0]
        breadcrumbs = row[1]
        if not (color and color.strip() and breadcrumbs):
            continue
        try:
            brand = breadcrumbs.split(',')[3].strip()
        except IndexError:
            continue
        data.append((brand, color.strip()))
        total_valid_records += 1

    if total_valid_records == 0:
        raise ValueError("No valid records with non-empty color and brand data found.")

    # Create a DataFrame
    df = pd.DataFrame(data, columns=['Brand', 'Color'])

    # Limit to top brands and colors
    top_brands = df['Brand'].value_counts().head(20).index
    top_colors = df['Color'].value_counts().head(20).index
    df_filtered = df[df['Brand'].isin(top_brands) & df['Color'].isin(top_colors)]

    pivot_table = df_filtered.groupby(['Brand', 'Color']).size().unstack(fill_value=0)

    # Calculate percentages for stacking
    pivot_table_percentage = pivot_table.div(pivot_table.sum(axis=1), axis=0) * 100
    
    # Print colors and their counts
    color_counts = Counter(df_filtered['Color'])

    # Define color mapping (German color names to matplotlib colors)
    color_mapping = {
        'Schwarz': '#000000',  # Black
        'Grau': '#808080',     # Gray
        'Weiß': '#F5F5F5',     # Light grayish-white (for visibility)
        'Blau': '#0000FF',     # Blue
        'Silber': '#C0C0C0',   # Silver
        'Rot': '#FF0000',      # Red
        'Grün': '#008000',     # Green
        'Sonstige': '#D3D3D3', # Light gray
        'Gelb': '#FFFF00',     # Yellow
        'Braun': '#8B4513',    # Brown
        'Orange': '#FFA500',   # Orange
        'Violett': '#800080',  # Purple
        'Gold': '#FFD700',     # Gold
        'Beige': '#F5F5DC',    # Beige
        'Bronze': '#CD7F32'    # Bronze
    }

    # Map colors to the pivot table columns, use fallback for unmapped colors
    plot_colors = [color_mapping.get(color, 'lightgray') for color in pivot_table_percentage.columns]

    # Create a stacked bar chart with mapped colors
    plt.figure(figsize=(12, 8), dpi=100)
    ax = pivot_table_percentage.plot(
        kind='bar', 
        stacked=True, 
        color=plot_colors, 
        edgecolor='black',  # Add edgecolor for white (Weiß) visibility
        ax=plt.gca()
    )
    plt.title('Percentage of Car Colors by Brand', fontsize=10)
    plt.xlabel('Brand', fontsize=12)
    plt.ylabel('Percentage (%)', fontsize=12)
    plt.xticks(fontsize=10, rotation=45, ha='right')
    plt.yticks(fontsize=10)
    plt.legend(title='Color', labels=pivot_table_percentage.columns, fontsize=14, 
               bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.tight_layout()

    # Display the plot
    plt.show()

except (Exception, Error) as error:
    print("Error while connecting to PostgreSQL:", error)

finally:
    if connection:
        cursor.close()
        connection.close()

### Percentage of Cars with Each Feature (Ausstattungen & Extras) (Top 20)
This script connects to a PostgreSQL database to fetch car listing data, specifically focusing on the Ausstattungen & Extras field, which contains a list of car features. After querying the database, the script processes the retrieved JSON data, counting occurrences of each feature across all valid records. It then calculates the percentage of cars that include each feature. The top 50 features are selected, and the results are visualized in a bar chart, showing the percentage of cars that have each feature in the dataset.

In [None]:
import psycopg2
from psycopg2 import Error
import pandas as pd
import matplotlib.pyplot as plt
import json
from collections import Counter

try:
    # Establish connection to PostgreSQL database
    connection = psycopg2.connect(**db_params)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query
    query = f"""
    SELECT 
        data->'Ausstattungen & Extras' AS extras
    FROM willhaben_items
    WHERE scrape_run_id = {scrape_run_id};
    """

    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()

    # Process the extras: count features and total valid records
    feature_counts = Counter()
    total_valid_records = 0

    for row in rows:
        # Extract the JSONB array (row[0] is the JSONB object)
        extras = row[0]
        if extras:  # Skip null or empty arrays
            try:
                # Convert JSONB array to Python list
                features = json.loads(json.dumps(extras))
                if isinstance(features, list) and features:  # Ensure it's a non-empty list
                    feature_counts.update(features)
                    total_valid_records += 1  # Count valid records
            except (json.JSONDecodeError, TypeError):
                continue  # Skip invalid JSON or non-list data

    # Calculate percentages: (feature count / total valid records) * 100
    if total_valid_records == 0:
        raise ValueError("No valid records with non-empty feature arrays found.")

    feature_percentages = {
        feature: (count / total_valid_records) * 100
        for feature, count in feature_counts.items()
    }

    # Convert to a pandas DataFrame
    df = pd.DataFrame.from_dict(feature_percentages, orient='index', columns=['Percentage'])
    df = df.sort_values('Percentage', ascending=False).reset_index().rename(columns={'index': 'Feature'})

    # Limit to top 20 features for readability
    df_top = df.head(50)

    # Print the DataFrame
    print(f"Feature Percentages (Top 20, based on {total_valid_records} valid records):")

    # Create a bar chart
    plt.figure(figsize=(12, 6))
    plt.bar(df_top['Feature'], df_top['Percentage'], color='b')
    plt.title('Percentage of Cars with Each Feature (Ausstattungen & Extras) (Top 20)')
    plt.xlabel('Feature')
    plt.ylabel('Percentage of Cars (%)')
    plt.xticks(rotation=60, fontsize=10, ha='right')
    plt.grid(True, axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()

    # Show the plot
    plt.show()

except (Exception, Error) as error:
    print("Error while connecting to PostgreSQL:", error)

finally:
    # Close database connection
    if connection:
        cursor.close()
        connection.close()

### Median Price vs. Number of Features (Ausstattungen & Extras)

This script connects to a PostgreSQL database to fetch car listing data, specifically the features from the Ausstattungen & Extras field and the car price. It processes the data by counting the number of features associated with each car and associates it with the price. The features are grouped into buckets based on their count (e.g., 1-5 features, 6-10 features, etc.), and the median price for each bucket is calculated. Buckets with insufficient data are filtered out. The script then calculates the correlation between the number of features and the median price across the valid buckets. If valid data is present, a plot is generated showing the relationship between the median price and the number of features per car. Finally, the Pearson correlation coefficient between feature count and price is calculated and displayed.

In [None]:
import psycopg2
from psycopg2 import Error
import pandas as pd
import matplotlib.pyplot as plt
import json
import numpy as np

try:
    # Establish connection to PostgreSQL database
    connection = psycopg2.connect(**db_params)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query to fetch extras from JSON and price from column
    query = f"""
    SELECT 
        data->'Ausstattungen & Extras' AS extras,
        price
    FROM willhaben_items
    WHERE price IS NOT NULL
    AND scrape_run_id = {scrape_run_id};
    """

    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()
    print(f"Fetched {len(rows)} rows from the database.")

    # Process the data: count features per car and associate with price
    feature_count_price = []
    total_valid_records = 0

    for row in rows:
        extras, price = row[0], row[1]
        if extras and price is not None:  # Skip null or empty arrays and null prices
            try:
                # Convert JSONB array to Python list
                features = json.loads(json.dumps(extras))
                if isinstance(features, list) and features:  # Ensure it's a non-empty list
                    feature_count = len(features)  # Count number of features
                    feature_count_price.append((feature_count, price))
                    total_valid_records += 1
            except (json.JSONDecodeError, TypeError):
                continue  # Skip invalid JSON or non-list data

    if total_valid_records == 0:
        raise ValueError("No valid records with non-empty feature arrays and prices found.")

    print(f"Total valid records: {total_valid_records}")

    # Create a DataFrame from feature counts and prices
    df = pd.DataFrame(feature_count_price, columns=['Feature_Count', 'Price'])

    # Bucket the feature counts (e.g., 1-5 features, 6-10 features, etc.)
    max_features = df['Feature_Count'].max()
    bins = range(0, max_features + 5, 5)  # Create buckets of size 5
    df['Feature_Bucket'] = pd.cut(df['Feature_Count'], bins=bins, include_lowest=True, right=False)

    # Calculate median price per bucket, explicitly setting observed=False
    bucket_stats = df.groupby('Feature_Bucket', observed=False).agg(
        Median_Price=('Price', 'median'),
        Count=('Price', 'count')
    ).reset_index()

    print("\nRaw bucket statistics before filtering:")
    print(bucket_stats)

    # Filter out buckets with insufficient data (e.g., less than 5 cars)
    bucket_stats = bucket_stats[bucket_stats['Count'] >= 5]

    print("\nFiltered bucket statistics (Count >= 5):")
    print(bucket_stats)

    # Extract bucket midpoints and median prices for correlation
    bucket_midpoints = [float(interval.mid) for interval in bucket_stats['Feature_Bucket']]  # Convert Interval to float
    median_prices = bucket_stats['Median_Price'].astype(float)  # Ensure numeric type

    print("\nBucket midpoints:", bucket_midpoints)
    print("Median prices:", median_prices.tolist())

    # Calculate Pearson correlation coefficient if sufficient valid data
    if (len(bucket_midpoints) > 1 and
        not median_prices.isna().any() and
        all(isinstance(x, (int, float)) for x in bucket_midpoints)):
        correlation = np.corrcoef(bucket_midpoints, median_prices)[0, 1]
    else:
        correlation = np.nan
        print("Correlation not calculated: Insufficient valid data or invalid types.")

    # Print results
    print(f"\nCorrelation between Feature Count and Median Price: {correlation:.2f}")
    print("\nBucket Statistics:")
    print(bucket_stats)

    # Create a plot if there is valid data
    if bucket_midpoints and not median_prices.isna().any():
        plt.figure(figsize=(10, 6))
        plt.plot(bucket_midpoints, median_prices, 'bo-', label=f'Correlation: {correlation:.2f}')
        plt.title('Median Price vs. Number of Features (Ausstattungen & Extras)')
        plt.xlabel('Number of Features (Bucket Midpoint)')
        plt.ylabel('Median Price (€)')
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.legend()
        plt.tight_layout()

        plt.show()
    else:
        print("Plot not generated: No valid data for plotting.")

except Exception as error:
    print(f"An unexpected error occurred: {error}")

finally:
    # Close database connection
    if connection:
        cursor.close()
        connection.close()

### Getting coordinates based on contact address

This script connects to a PostgreSQL database, fetches addresses from the willhaben_items table, and uses the Nominatim geocoder from the geopy library to convert those addresses into geographic coordinates (latitude and longitude). For each valid geocoded address, it updates the corresponding database record with the calculated coordinates. The script also tracks the geocoding results, storing the coordinates for potential analysis or visualization. It includes error handling for invalid addresses and other geocoding issues, and respects the rate limit of the Nominatim API by optionally pausing between requests. After processing, the script closes the database connection to ensure proper resource management.

In [None]:
import psycopg2
import numpy as np
from geopy.geocoders import Nominatim

import time

# Initialize Nominatim geocoder
geolocator = Nominatim(user_agent="myGeocoder")

try:
    # Establish connection to PostgreSQL database
    connection = psycopg2.connect(**db_params)

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query to fetch willhaben_code and contact_address
    query = f"""
    SELECT 
        willhaben_code,
        contact_address
    FROM willhaben_items
    WHERE contact_address IS NOT NULL
    AND scrape_run_id = {scrape_run_id};
    """
    
    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()
    print(f"Fetched {len(rows)} rows from the database.")

    # Lists to store geocoded coordinates
    latitudes = []
    longitudes = []
    addresses = []
    codes = []

    # Geocode each address and update the table
    for row in rows:
        willhaben_code, contact_address = row
        try:
            # Clean and geocode the address
            location = geolocator.geocode(contact_address, timeout=10)
            if location:
                # Update the coordinates column with "latitude,longitude"
                cursor.execute("""
                    UPDATE willhaben_items
                    SET coordinates = %s
                    WHERE willhaben_code = %s;
                """, (f"{location.latitude},{location.longitude}", willhaben_code))

                connection.commit()
                # Store for plotting
                latitudes.append(location.latitude)
                longitudes.append(location.longitude)
                addresses.append(location.address)
                codes.append(willhaben_code)
                print(f"Geocoded and updated: {contact_address} -> ({location.latitude}, {location.longitude})")
            else:
                print(f"Failed to geocode: {contact_address}")
        except Exception as e:
            print(f"Error geocoding {contact_address}: {e}")
        # Respect Nominatim's rate limit (1 request per second)
        #time.sleep(1)

except Exception as error:
    print(f"An unexpected error occurred: {error}")

finally:
    # Close database connections
    if cursor:
        cursor.close()
    if connection:
        connection.close()
        print("Database connection closed.")

### Visualization of coordinates

This script connects to a PostgreSQL database and fetches coordinates, willhaben_code, and contact_address from the willhaben_items table where coordinates are available. It processes the rows by extracting latitude and longitude from the coordinates field, handling any parsing errors, and stores the data in lists for further use. After gathering the coordinates, it uses Plotly to generate a density heatmap, visualizing the distribution of locations based on the latitude and longitude values. The map is centered around the average coordinates, and the heatmap uses a color scale to represent the density of listings. Finally, the script displays the heatmap, providing an insightful view of the data's geographical spread.

In [None]:
import psycopg2
from psycopg2 import Error
import numpy as np
import plotly.graph_objects as go
from IPython.display import display

# Database connection parameters
db_params = {
    'dbname': 'scraped',
    'user': 'scraped',
    'password': 'scraped',
    'host': '127.0.0.1',
    'port': '5432'
}

try:
    # Establish connection to PostgreSQL database
    connection = psycopg2.connect(**db_params)
    print("Successfully connected to PostgreSQL database.")

    # Create a cursor to perform database operations
    cursor = connection.cursor()

    # Define the SQL query to fetch coordinates, willhaben_code, and contact_address
    query = f"""
    SELECT 
        willhaben_code,
        contact_address,
        coordinates
    FROM willhaben_items
    WHERE coordinates IS NOT NULL
    AND scrape_run_id = {scrape_run_id};
    """

    # Execute the query
    cursor.execute(query)

    # Fetch all rows
    rows = cursor.fetchall()
    print(f"Fetched {len(rows)} rows from the database.")

    # Lists to store data for plotting
    latitudes = []
    longitudes = []
    codes = []
    addresses = []

    # Process rows (for coordinates column)
    for row in rows:
        willhaben_code, contact_address, coordinates = row
        try:
            # Parse coordinates from "latitude,longitude"
            lat, lon = map(float, coordinates.split(','))
            latitudes.append(lat)
            longitudes.append(lon)
            codes.append(willhaben_code)
            addresses.append(contact_address)
        except ValueError as e:
            print(f"Invalid coordinates format for {willhaben_code}: {coordinates}")

    # Alternative: Process rows for PostGIS (uncomment if using geom)
    """
    for row in rows:
        willhaben_code, contact_address, latitude, longitude = row
        latitudes.append(latitude)
        longitudes.append(longitude)
        codes.append(willhaben_code)
        addresses.append(contact_address)
    """

except Exception as error:
    print(f"An unexpected error occurred: {error}")

finally:
    # Close database connections
    if cursor:
        cursor.close()
    if connection:
        connection.close()
        print("Database connection closed.")

# Create density heatmap
if latitudes and longitudes:
    fig = go.Figure(go.Densitymap(
        lat=latitudes,
        lon=longitudes,
        radius=20,  # Adjust for density spread
        opacity=0.7,
        zauto=True,
        colorscale='Hot',
        showscale=True,
        text=[f"{code}: {addr}" for code, addr in zip(codes, addresses)]
    ))

    # Update layout for map
    fig.update_layout(
        title='Willhaben Anzeigen Density Heatmap',
        mapbox=dict(
            style='open-street-map',  # Free map style
            center=dict(
                lat=np.mean(latitudes),
                lon=np.mean(longitudes)
            ),
            zoom=1
        ),
        margin=dict(l=0, r=0, t=30, b=0)
    )

    # Display the plot
    display(fig)
else:
    print("No coordinates available to plot.")

### Prepare data for Kafka streaming

In [None]:
def parse_basisdaten(basis_str):
    try:
        return json.loads(basis_str)
    except (json.JSONDecodeError, TypeError):
        return {}

def extract_kw(leistung_str):
    if isinstance(leistung_str, str):
        match = re.search(r'(\d+)\s*kW', leistung_str)
        if match:
            return int(match.group(1))
    return None

def parse_erstzulassung(date_str):
    try:
        return pd.to_datetime(date_str, format='%m/%Y')
    except:
        return pd.NaT

def clean_kilometer(km_str):
    if isinstance(km_str, str):
        digits = re.sub(r'[^\d]', '', km_str)
        return int(digits) if digits else None
    return None

try:
    # Connect to PostgreSQL
    connection = psycopg2.connect(**db_params)
    cursor = connection.cursor()

    # Define and run the SQL query
    query = f"""
    SELECT 
        willhaben_code,
        title,
        price,
        breadcrumbs,
        data->>'Basisdaten' AS basisdaten
    FROM willhaben_items
    WHERE scrape_run_id = {scrape_run_id}
      AND data IS NOT NULL
      AND breadcrumbs IS NOT NULL
    """
    cursor.execute(query)
    rows = cursor.fetchall()

    # Define column names
    columns = ['willhaben_code', 'title', 'price', 'breadcrumbs', 'basisdaten']

    # Convert to DataFrame
    df = pd.DataFrame(rows, columns=columns)

    # Extract brand and model
    df[['brand', 'model']] = df['breadcrumbs'].str.split(',', expand=True).iloc[:, -2:]
    df = df.drop(columns=['breadcrumbs'])
    df['price'] = df['price'].astype(float)

    # Expand basisdaten into columns
    basisdaten_expanded = df['basisdaten'].apply(parse_basisdaten).apply(pd.Series)
    df = pd.concat([df, basisdaten_expanded], axis=1)

    # Clean extracted fields
    df['Leistung'] = df['Leistung'].apply(extract_kw)
    
    df['Erstzulassung'] = df['Erstzulassung'].apply(lambda x: datetime.strptime(x, "%m/%Y").strftime("%Y-%m") if isinstance(x, str) else None)
    
    df['Kilometerstand'] = df['Kilometerstand'].apply(clean_kilometer)

    # Drop raw basisdaten column
    df = df.drop(columns=['basisdaten'])

    # Convert DataFrame to list of dicts
    json_data = df.to_dict(orient='records')
    
finally:
    if connection:
        cursor.close()
        connection.close()

#### Send data to Kafka

In [None]:
from kafka import KafkaProducer
import json

# Kafka config
KAFKA_BROKER = '172.29.16.101:9092'
KAFKA_TOPIC = 'willhaben-items'

# Initialize Kafka producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)

# Send each JSON object to Kafka
for record in json_data:
    producer.send(KAFKA_TOPIC, value=record)

producer.flush()
print(f"Sent {len(json_data)} messages to Kafka topic '{KAFKA_TOPIC}'")

### Map car brand production countries from external CSV file

In [None]:
# Load the brand–country mapping CSV
brand_country_df = pd.read_csv("country_by_carbrands.csv")

# Merge to add the country based on the brand
df = df.merge(brand_country_df, how='left', left_on='brand', right_on='Brand')