<a href="https://colab.research.google.com/github/sudo-Oliver/Predictive-Analytics-Private/blob/main/notebooks/Preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install py7zr
!pip install duckdb
import pandas as pd
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
import gdown
import py7zr
import duckdb
import os
from pathlib import Path
from tqdm.notebook import tqdm

Collecting py7zr
  Downloading py7zr-0.22.0-py3-none-any.whl.metadata (16 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting pycryptodomex>=3.16.0 (from py7zr)
  Downloading pycryptodomex-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting pyzstd>=0.15.9 (from py7zr)
  Downloading pyzstd-0.16.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.4 kB)
Collecting pyppmd<1.2.0,>=1.1.0 (from py7zr)
  Downloading pyppmd-1.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting pybcj<1.1.0,>=1.0.0 (from py7zr)
  Downloading pybcj-1.0.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.9 kB)
Collecting multivolumefile>=0.2.3 (from py7zr)
  Downloading multivolumefile-0.2.3-py3-none-any.whl.metadata (6.3 kB)
Collecting inflate64<1.1.0,>=1.0.0 (from py7zr)
  Downloading inflate64-1.0.1-cp311-cp311-manylinux_2_17_

### Load Data

In [3]:
 #Provide the file ID
gdown.download("https://drive.google.com/uc?export=download&id=1ykDl_A5YRirIFUeKHCGJoaavaw6HiJqh", "sensor_data.7z", quiet=False)
 #Extract the .7z file to a specific folder
!7z e "/content/sensor_data.7z" -o"/content/data/raw"

Downloading...
From (original): https://drive.google.com/uc?export=download&id=1ykDl_A5YRirIFUeKHCGJoaavaw6HiJqh
From (redirected): https://drive.google.com/uc?export=download&id=1ykDl_A5YRirIFUeKHCGJoaavaw6HiJqh&confirm=t&uuid=b78a8390-2f92-42b9-9137-895cd98eaf78
To: /content/sensor_data.7z
100%|██████████| 592M/592M [00:07<00:00, 80.5MB/s]



7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan /content/                   1 file, 591843432 bytes (565 MiB)

Extracting archive: /content/sensor_data.7z
--
Path = /content/sensor_data.7z
Type = 7z
Physical Size = 591843432
Headers Size = 282
Method = LZMA2:24
Solid = +
Blocks = 1

  0%      0% - household_sensors_electric.csv                                       1% - household_sensors_electric.csv                                       2% - household_sensors_electric.csv                                       3% - household_sensors_

In [4]:
# Load metadata dataframes
df_home = pd.read_csv('https://raw.githubusercontent.com/sudo-Oliver/Predictive-Analytics-Private/refs/heads/main/data/raw/metadata/home.csv')
df_room = pd.read_csv('https://raw.githubusercontent.com/sudo-Oliver/Predictive-Analytics-Private/refs/heads/main/data/raw/metadata/room.csv')
df_sensor = pd.read_csv('https://raw.githubusercontent.com/sudo-Oliver/Predictive-Analytics-Private/refs/heads/main/data/raw/metadata/sensor.csv')
df_person = pd.read_csv('https://raw.githubusercontent.com/sudo-Oliver/Predictive-Analytics-Private/refs/heads/main/data/raw/metadata/person.csv')

# Load individual dataframes
df_electric = pd.read_csv('/content/data/raw/household_sensors_electric.csv', parse_dates=['timestamp_local'])
df_gas = pd.read_csv('/content/data/raw/household_sensors_gas.csv', parse_dates=['timestamp_local'])
df_sensor_room = pd.read_csv('/content/data/raw/room_and_appliance_sensors_room.csv', parse_dates=['timestamp_local'], engine="pyarrow")
df_tempprobe = pd.read_csv('/content/data/raw/household_sensors_tempprobe.csv', parse_dates=['timestamp_local'], engine="pyarrow")

### Clean Electric data
1. Filtering out unreliable data period (April 17, 2018, 08:00-10:00)
2. Get Room and Home IDs
3. Drop columns
4. interpolation for std_consumption

In [5]:
def clean_electric_data(df_electric):
    # Create a copy of the dataframe
    df_clean = df_electric.copy()

    # Create mask for unreliable period
    unreliable_start = pd.Timestamp('2018-04-17 08:00:00')
    unreliable_end = pd.Timestamp('2018-04-17 10:00:00')

    # Filter out unreliable data
    df_clean = df_clean[~((df_clean['timestamp_local'] >= unreliable_start) &
                            (df_clean['timestamp_local'] <= unreliable_end))]

    # Extract numeric part from consumer_id and rename to homeid
    df_clean['homeid'] = df_clean['consumer_id'].str.extract(r'(\d+)')
    df_clean = df_clean.drop(columns=['consumer_id'])

    # Split room into type and roomid
    df_clean[['type', 'roomid']] = df_clean['room'].str.extract(r'([a-zA-Z]+)(\d+)')
    df_clean = df_clean.drop(columns=['room'])

    # Remove mean_consumption column
    df_clean = df_clean.drop(columns=['mean_consumption'])

    # Interpolate missing values in std_consumption
    df_clean.set_index('timestamp_local', inplace=True)
    df_clean['std_consumption'] = df_clean['std_consumption'].interpolate(method='time')
    df_clean.reset_index(inplace=True)

    # Rename columns
    df_clean.rename(columns={
        'min_consumption': 'electric_min_consumption',
        'max_consumption': 'electric_max_consumption',
        'median_consumption': 'electric_median_consumption',
        'total_consumption_Wh': 'electric_total_consumption_Wh',
        'sensor': 'sensorid'
    }, inplace=True)

    return df_clean

# Apply all cleaning steps at once
df_electric_clean = clean_electric_data(df_electric)
df_electric_clean.head()



Unnamed: 0.1,timestamp_local,Unnamed: 0,sensorid,electric_min_consumption,electric_max_consumption,std_consumption,electric_median_consumption,electric_total_consumption_Wh,homeid,type,roomid
0,2017-03-07 15:00:00,0,4472,0.0,0.34,0.047283,0.251,0.233624,100,livingroom,1038
1,2017-03-07 16:00:00,1,4472,0.038,2.321,0.785224,0.243,0.677716,100,livingroom,1038
2,2017-03-07 17:00:00,2,4472,0.083,4.746,0.670984,0.404,0.567129,100,livingroom,1038
3,2017-03-07 18:00:00,3,4472,0.048,0.792,0.125662,0.299,0.312481,100,livingroom,1038
4,2017-03-07 19:00:00,4,4472,0.044,1.278,0.043239,0.226,0.20986,100,livingroom,1038


### Clean gas data

In [6]:
def clean_gas_data(df_gas):
    # Create a copy of the dataframe
    df_clean = df_gas.copy()

    # Create mask for unreliable period
    unreliable_start = pd.Timestamp('2018-04-17 08:00:00')
    unreliable_end = pd.Timestamp('2018-04-17 10:00:00')

    # Filter out unreliable data
    df_clean = df_clean[~((df_clean['timestamp_local'] >= unreliable_start) &
                            (df_clean['timestamp_local'] <= unreliable_end))]

    # Extract numbers from consumer_id and room
    df_clean['homeid'] = df_clean['consumer_id'].str.extract(r'(\d+)')
    df_clean['roomid'] = df_clean['room'].str.extract(r'(\d+)')

    # Drop columns
    df_clean = df_clean.drop(columns=['std_consumption', 'consumer_id', 'room'])

    # Rename columns
    df_clean.rename(columns={
        'mean_consumption': 'gas_mean_consumption',
        'min_consumption': 'gas_min_consumption',
        'max_consumption': 'gas_max_consumption',
        'median_consumption': 'gas_median_consumption',
        'total_consumption_Wh': 'gas_total_consumption_Wh',
        'sensor': 'sensorid'
    }, inplace=True)
    return df_clean

df_gas_clean = clean_gas_data(df_gas)

### Household_sensors_tempprobe Preprocessing

In [7]:
def clean_data(df_tempprobe):
    # Create a copy for cleaning
    df_clean = df_tempprobe.copy()

    # Create mask for unreliable period
    unreliable_start = pd.Timestamp('2018-04-17 08:00:00')
    unreliable_end = pd.Timestamp('2018-04-17 10:00:00')

    # Filter out unreliable data
    df_clean = df_clean[~((df_clean['timestamp_local'] >= unreliable_start) &
                            (df_clean['timestamp_local'] <= unreliable_end))]

    # Extract numeric part from consumer_id
    df_clean['homeid'] = df_clean['consumer_id'].str.extract(r'(\d+)')
    # Split 'room' into 'roomid' and 'type'
    df_clean[['type', 'roomid']] = df_clean['room'].str.extract(r'([a-zA-Z]+)(\d+)')
    # Remove specified temperature columns
    df_clean = df_clean.drop(columns=['mean_temperature', 'min_temperature', 'std_temperature', 'max_temperature', 'room', 'consumer_id'])
    # Map text values to numbers
    mapping = {
        'central-heating-flow': 1,
        'hot-water-cold-pipe': 2,
        'central-heating-return': 3,
        'hot-water-hot-pipe': 4
    }
    # Apply mapping to the 'measured_entity' column
    df_clean['measured_entity'] = df_clean['measured_entity'].map(mapping)
    df_clean = df_clean.rename(columns={'sensor': 'sensorid'})
    return df_clean

df_tempprobe_clean = clean_data(df_tempprobe.copy())
df_tempprobe_clean.head()
df_tempprobe_clean

Unnamed: 0,Unnamed: 1,timestamp_local,sensorid,measured_entity,median_temperature,homeid,type,roomid
0,0,2017-03-07 15:00:00,4450,3,25.5,100,kitchen,1037
1,1,2017-03-07 16:00:00,4450,3,19.0,100,kitchen,1037
2,2,2017-03-07 17:00:00,4450,3,17.5,100,kitchen,1037
3,3,2017-03-07 18:00:00,4450,3,17.5,100,kitchen,1037
4,4,2017-03-07 19:00:00,4450,3,17.0,100,kitchen,1037
...,...,...,...,...,...,...,...,...
5719128,9008,2018-06-30 20:00:00,4376,4,25.5,99,kitchen,1026
5719129,9009,2018-06-30 21:00:00,4376,4,27.3,99,kitchen,1026
5719130,9010,2018-06-30 22:00:00,4376,4,25.8,99,kitchen,1026
5719131,9011,2018-06-30 23:00:00,4376,4,25.5,99,kitchen,1026


### Room and appliance sensor preprocessing

In [8]:
def clean_room_app_data(df_sensor_room):
    # Create a copy for cleaning
    df_clean = df_sensor_room.copy()

    # Create mask for unreliable period
    unreliable_start = pd.Timestamp('2018-04-17 08:00:00')
    unreliable_end = pd.Timestamp('2018-04-17 10:00:00')

    # Filter out unreliable data
    df_clean = df_clean[~((df_clean['timestamp_local'] >= unreliable_start) &
                            (df_clean['timestamp_local'] <= unreliable_end))]

    # Extract numbers from consumer_id and room
    # Lösung mit Numpy wegen Laufzeit und RAM
    consumer_id_array = df_clean['consumer_id'].values
    room_array = df_clean['room'].values

    df_clean['homeid'] = extract_digits_numpy(consumer_id_array)
    df_clean['roomid'] = extract_digits_numpy(room_array)

    # Filter out all rows where measured entity is not temperature (such as humidity)
    df_clean = df_clean.query('measurement == "temperature"')

    # Drop columns
    df_clean = df_clean.drop(columns=['mean_value', 'min_value', 'max_value', 'std_value', 'consumer_id', 'room'])

    # Rename columns
    df_clean.rename(columns={
        'sensor': 'sensorid'
    }, inplace=True)

    return df_clean

def extract_digits_numpy(arr):
    return np.array([''.join(c for c in s if c.isdigit()) for s in arr], dtype=int)

df_sensor_room_clean = clean_room_app_data(df_sensor_room)

### Sensor.csv Preprocessing

In [None]:
df_sensor = df_sensor[['sensorid', 'status']]
df_sensor

### Prepare metadata for income_band, education

In [None]:
# Prepare income_band data
df_home = df_home[["homeid", "income_band"]].groupby('homeid', as_index=False).sum()

income_midpoints = {
    "Missing": 0,
    "less than £10,800": 1,
    "£10,800 to £13,499": 2,
    "£13,500 to £16,199": 3,
    "£16,200 to £19,799": 4,
    "£19,800 to £23,399": 5,
    "£23,400 to £26,999": 6,
    "£27,000 to £32,399": 7,
    "£32,400 to £37,799": 8,
    "£37,800 to £43,199": 9,
    "£43,200 to £48,599": 10,
    "£48,600 to £53,999": 11,
    "£54,000 to £65,999": 12,
    "£66,000 to £77,999": 13,
    "£78,000 to £89,999": 14,
    "£90,000 or more": 15
}

df_home["income_band_mid"] = df_home["income_band"].map(income_midpoints)
df_home = df_home.drop(columns=['income_band'])

# Prepare education data per home
df_person['education'] = df_person['education'].str.strip().str.lower()
df_person['education'] = df_person['education'].fillna('unknown')

# Map qualifications to numerical values
qualification_mapping = {
    "phd": 9,  # Highest qualification
    "degree level qualification (or equivalent), e.g. bsc, ba, msc, ma": 8,
    "higher educational qualification below degree level": 7,
    "onc / national level btec": 6,
    "a-levels or highers": 5,
    "gcse grade d-g or cse grade 2-5 or standard grade level 4-6": 4,
    "o level or gcse equivalent (grade a-c) or o grade/cse equivalent (grade 1) or standard grade level 1": 3,
    "other qualifications": 2,
    "no formal qualifications": 1,
    "unknown": 0 } # Lowest qualification
df_person['education_map'] = df_person['education'].map(qualification_mapping)

df_person = df_person[["homeid", "education_map"]]

# Choose highest education
highest_education = df_person.groupby('homeid')['education_map'].max().reset_index()
df_person.drop(columns=['education_map'], inplace=True)
df_person = df_person.drop_duplicates()
person_education = pd.merge(df_person, highest_education, on='homeid', how='inner')

# Map home and person including education
df_home_edu_income = pd.merge(df_home, person_education, on='homeid', how='inner')
df_home_edu_income

### TODO
- Merge all dataframes on timestamp
- Join education and income data for every row on same homeid
- Delete data from sensor where sensor appears as faulty or offline

# DuckDB Transformation


In [11]:
def verify_dataframes():
    """Print info about preprocessed DataFrames"""
    print("Electric homes:", df_electric_clean['homeid'].nunique())
    print("Gas homes:", df_gas_clean['homeid'].nunique())
    print("Temp homes:", df_tempprobe_clean['homeid'].nunique())
    print("Room homes:", df_sensor_room_clean['homeid'].nunique())
    print("Metadata homes:", df_home_edu_income['homeid'].nunique())

    # 1. First verify data
verify_dataframes()

Electric homes: 254
Gas homes: 255
Temp homes: 255
Room homes: 255
Metadata homes: 255


In [12]:
def merge_and_export_chunks(chunk_size=10):
    # Create connection
    con = duckdb.connect(':memory:')

    # Register DataFrames
    con.register('df_electric_clean', df_electric_clean)
    con.register('df_gas_clean', df_gas_clean)
    con.register('df_tempprobe_clean', df_tempprobe_clean)
    con.register('df_sensor_room_clean', df_sensor_room_clean)
    con.register('df_home_edu_income', df_home_edu_income)

    # Get homes from metadata
    homes = con.execute("""
        SELECT DISTINCT homeid
        FROM df_home_edu_income
        ORDER BY homeid
    """).fetchall()

    # Create output directory
    if not os.path.exists('merged_data'):
        os.makedirs('merged_data')

    # Process in chunks
    for i in range(0, len(homes), chunk_size):
        home_chunk = homes[i:i + chunk_size]
        home_ids = ','.join(str(h[0]) for h in home_chunk)

        chunk_query = f"""
        SELECT
            COALESCE(e.timestamp_local, g.timestamp_local, t.timestamp_local, r.timestamp_local) as timestamp_local,
            h.homeid,
            e.electric_total_consumption_Wh,
            g.gas_total_consumption_Wh,
            t.median_temperature,
            r.median_value as room_temperature,
            h.education_map,
            h.income_band_mid
        FROM df_home_edu_income h
        LEFT JOIN df_electric_clean e ON h.homeid = e.homeid
        LEFT JOIN df_gas_clean g ON h.homeid = g.homeid AND e.timestamp_local = g.timestamp_local
        LEFT JOIN df_tempprobe_clean t ON h.homeid = t.homeid AND e.timestamp_local = t.timestamp_local
        LEFT JOIN df_sensor_room_clean r ON h.homeid = r.homeid AND e.timestamp_local = r.timestamp_local
        WHERE h.homeid IN ({home_ids})
        ORDER BY h.homeid, timestamp_local
        """

        chunk_df = con.execute(chunk_query).df()
        chunk_df.to_parquet(f'merged_data/chunk_{i}.parquet')

        # Cleanup after each chunk
        del chunk_df
        import gc
        gc.collect()

    con.close()

# Only execute merge
merge_and_export_chunks()

In [13]:
def validate_merged_chunks():
    """Validate merged data chunks"""
    merged_dir = Path('merged_data')
    chunks = sorted(merged_dir.glob('*.parquet'))

    chunk_info = {}
    for chunk in tqdm(chunks, desc="Validating chunks"):
        df = pd.read_parquet(chunk)
        chunk_info[chunk.name] = {
            'homes': df['homeid'].unique(),
            'rows': len(df),
            'start_date': df['timestamp_local'].min(),
            'end_date': df['timestamp_local'].max()
        }

    total_homes = set().union(*[set(info['homes']) for info in chunk_info.values()])
    total_rows = sum(info['rows'] for info in chunk_info.values())

    print(f"\nValidation Results:")
    print(f"Number of chunks: {len(chunks)}")
    print(f"Total unique homes: {len(total_homes)}")
    print(f"Total rows: {total_rows}")
    print(f"Date range: {min(info['start_date'] for info in chunk_info.values())} to "
          f"{max(info['end_date'] for info in chunk_info.values())}")

    return len(total_homes) == 255

def load_complete_dataset():
    """Load all chunks into single DataFrame"""
    merged_dir = Path('merged_data')
    all_chunks = sorted(merged_dir.glob('*.parquet'))

    print("Loading complete dataset...")
    merged_df = pd.concat([
        pd.read_parquet(f) for f in tqdm(all_chunks, desc="Loading chunks")
    ], ignore_index=True)

    merged_df = merged_df.sort_values(['homeid', 'timestamp_local'])
    print(f"\nLoaded dataset with shape: {merged_df.shape}")

    return merged_df

    # Execute validation and loading
if validate_merged_chunks():
    print("\nValidation successful! Loading complete dataset...")
    merged_df = load_complete_dataset()
else:
    print("\nValidation failed! Please check merged data.")

Validating chunks:   0%|          | 0/26 [00:00<?, ?it/s]


Validation Results:
Number of chunks: 26
Total unique homes: 255
Total rows: 42087148
Date range: 2016-08-10 11:00:00 to 2018-07-01 00:00:00

Validation successful! Loading complete dataset...
Loading complete dataset...


Loading chunks:   0%|          | 0/26 [00:00<?, ?it/s]


Loaded dataset with shape: (42087148, 8)


# BackUp

In [43]:
def verify_dataframes():
    print("Electric data:", df_electric_clean.shape, "Null values:", df_electric_clean.isnull().sum().sum())
    print("Gas data:", df_gas_clean.shape, "Null values:", df_gas_clean.isnull().sum().sum())
    print("Tempprobe data:", df_tempprobe_clean.shape, "Null values:", df_tempprobe_clean.isnull().sum().sum())
    print("Room data:", df_sensor_room_clean.shape, "Null values:", df_sensor_room_clean.isnull().sum().sum())
    print("Education/Income data:", df_home_edu_income.shape, "Null values:", df_home_edu_income.isnull().sum().sum())

verify_dataframes()

def report_df_stats(df, stage_name):
    """Report dataframe statistics"""
    dupes = df.duplicated(subset=['timestamp_local', 'homeid']).sum()
    print(f"\n{stage_name}:")
    print(f"Shape: {df.shape}")
    print(f"Duplicates: {dupes}")

def resample_and_interpolate(group):
    """Resample group data to hourly frequency with duplicate handling"""
    # Save homeid and measured_entity
    homeid = group['homeid'].iloc[0]
    measured_entity = group['measured_entity'].iloc[0]

    # Remove measured_entity before interpolation
    group_numeric = group.drop('measured_entity', axis=1)

    # Set timestamp index and handle duplicates
    group_numeric = group_numeric.set_index('timestamp_local')
    group_numeric = group_numeric.loc[~group_numeric.index.duplicated(keep='first')]

    # Create complete hourly range
    full_range = pd.date_range(
        start=group_numeric.index.min(),
        end=group_numeric.index.max(),
        freq='h'
    )

    # Reindex and interpolate
    resampled = group_numeric.reindex(full_range).interpolate(method='time')

    # Add back homeid and measured_entity
    resampled['homeid'] = homeid
    resampled['measured_entity'] = measured_entity

    return resampled

def merge_sensor_data():
    # Prepare clean dataframes
    def deduplicate_df(df, name):
      report_df_stats(df, f"Initial {name} data")
      if name == 'tempprobe':
          # Handle measured_entity separately for tempprobe
          measured_entity_df = df.groupby(['timestamp_local', 'homeid'])['measured_entity'].first().reset_index()
          # Calculate means for other columns
          numeric_df = df.drop('measured_entity', axis=1).groupby(['timestamp_local', 'homeid']).mean(numeric_only=True).reset_index()
          # Merge back together
          return pd.merge(numeric_df, measured_entity_df, on=['timestamp_local', 'homeid'], how='left')
      else:
          # Normal handling for other dataframes
          return df.groupby(['timestamp_local', 'homeid']).mean(numeric_only=True).reset_index()

    # Clean individual dataframes
    electric = deduplicate_df(df_electric_clean, 'electric')
    gas = deduplicate_df(df_gas_clean, 'gas')
    tempprobe = deduplicate_df(df_tempprobe_clean, 'tempprobe')
    room = deduplicate_df(df_sensor_room_clean, 'room')

    # Convert to Int64
    for df in [electric, gas, tempprobe, room]:
        df['homeid'] = df['homeid'].astype(int)

    # Merge 1: Electric + Gas
    df_combined = pd.merge(
        electric, gas,
        on=['timestamp_local', 'homeid'],
        how='inner',
        suffixes=('_electric', '_gas')
    )
    report_df_stats(df_combined, 'After electric-gas merge')

    # Merge 2: Add Tempprobe
    df_combined = pd.merge(
        df_combined, tempprobe,
        on=['timestamp_local', 'homeid'],
        how='inner',
        suffixes=('', '_tempprobe')
    )
    report_df_stats(df_combined, 'After tempprobe merge')

    # Merge 3: Add Room
    df_combined = pd.merge(
        df_combined, room,
        on=['timestamp_local', 'homeid'],
        how='inner',
        suffixes=('', '_room')
    )
    report_df_stats(df_combined, 'After room merge')

    # Merge 4: Add income and education
    df_combined = pd.merge(
      df_combined,
      df_home_edu_income,
      on='homeid',
      how='inner'
    )
    df_combined['income_band_mid'] = df_combined['income_band_mid'].astype(int)
    df_combined['education_map'] = df_combined['education_map'].astype(int)
    report_df_stats(df_combined, 'After income and education merge')

    df_combined = df_combined.sort_values(['homeid', 'timestamp_local'])
    # 7. Handle missing values and duplicates
    # Process groups and reset index properly
    df_combined = (df_combined.groupby('homeid')
              .apply(resample_and_interpolate)
              .reset_index(level=0, drop=True)  # Drop the group index
              .reset_index()  # Convert timestamp index to column
              .rename(columns={'index': 'timestamp_local'}))
    df_combined = df_combined.drop(columns=['Unnamed: 0_electric', 'Unnamed: 0_gas', ''])
    return df_combined

df_merged = merge_sensor_data()

Electric data: (1528854, 11) Null values: 0
Gas data: (634392, 10) Null values: 0
Tempprobe data: (5716723, 8) Null values: 0
Room data: (11429686, 7) Null values: 0
Education/Income data: (255, 3) Null values: 0

Initial electric data:
Shape: (1528854, 11)
Duplicates: 0

Initial gas data:
Shape: (634392, 10)
Duplicates: 0

Initial tempprobe data:
Shape: (5716723, 8)
Duplicates: 4193882

Initial room data:
Shape: (11429686, 7)
Duplicates: 9889260

After electric-gas merge:
Shape: (629411, 16)
Duplicates: 0

After tempprobe merge:
Shape: (621611, 20)
Duplicates: 0

After room merge:
Shape: (618735, 24)
Duplicates: 0

After income and education merge:
Shape: (618735, 26)
Duplicates: 0


  .apply(resample_and_interpolate)


In [50]:
print(f"Homes: {df_merged['homeid'].nunique()}")

Homes: 254


### Merge the Dataframes with Pandas Only Method

In [32]:
# Convert homeid to int64 in all relevant dataframes
df_electric_clean['homeid'] = df_electric_clean['homeid'].astype(int)
df_gas_clean['homeid'] = df_gas_clean['homeid'].astype(int)
df_tempprobe_clean['homeid'] = df_tempprobe_clean['homeid'].astype(int)
df_sensor_room_clean['homeid'] = df_sensor_room_clean['homeid'].astype(int)

# 1. First merge sensor status with sensor readings and clean up columns
df_electric_with_status = pd.merge(
    df_electric_clean,
    df_sensor[['sensorid', 'status']],
    on='sensorid',
    how='inner'
)

# 2. Filter out inactive/faulty sensors and drop unnecessary columns
df_electric_active = df_electric_with_status[df_electric_with_status['status'] == 'active'].copy()
df_electric_active = df_electric_active.drop(['status'], axis=1)

# 3. Similarly merge and filter other sensor data
df_gas_active = pd.merge(
    df_gas_clean,
    df_sensor[['sensorid', 'status']],
    on='sensorid',
    how='inner'
).query('status == "active"')

df_tempprobe_active = pd.merge(
    df_tempprobe_clean,
    df_sensor[['sensorid', 'status']],
    on='sensorid',
    how='inner'
).query('status == "active"')

df_sensor_room_active = pd.merge(
    df_sensor_room_clean,
    df_sensor[['sensorid', 'status']],
    on='sensorid',
    how='inner'
).query('status == "active"')

# AB hier GPU RAM und nicht System RAM verwenden
# 4. Merge all sensor data based on timestamp and homeid
# First merge electric and gas data
df_combined = pd.merge(
    df_electric_active,
    df_gas_active.drop(['status'] + ['Unnamed: 0'] if 'Unnamed: 0' in df_gas_active.columns else [], axis=1),
    on=['timestamp_local', 'homeid'],
    how='inner',
    suffixes=('_electric', '_gas')
)

# Then merge tempprobe data
columns_to_drop_tempprobe = ['status'] + ['Unnamed: 0'] if 'Unnamed: 0' in df_tempprobe_active.columns else []
df_combined = pd.merge(
    df_combined,
    df_tempprobe_active.drop(columns_to_drop_tempprobe, axis=1),
    on=['timestamp_local', 'homeid'],
    how='inner',
    suffixes=('', '_tempprobe')
)

# Finally merge room sensor data
columns_to_drop_room = ['status'] + ['Unnamed: 0'] if 'Unnamed: 0' in df_sensor_room_active.columns else []
df_combined = pd.merge(
    df_combined,
    df_sensor_room_active.drop(columns_to_drop_room, axis=1),
    on=['timestamp_local', 'homeid'],
    how='inner',
    suffixes=('', '_room')
)

# Ensure homeid is int
df_combined['homeid'] = df_combined['homeid'].astype(int)

# 5. Merge with education and income data
final_df = pd.merge(
    df_combined,
    df_home_edu_income,
    on='homeid',
    how='inner'
)

# 6. Sort before resampling
final_df = final_df.sort_values(['homeid', 'timestamp_local'])

# 7. Handle missing values and duplicates
def resample_and_interpolate(group):
    """Resample group data to hourly frequency with duplicate handling"""
    # Save homeid
    homeid = group['homeid'].iloc[0]

    # Set timestamp index and handle duplicates
    group = group.set_index('timestamp_local')
    group = group.loc[~group.index.duplicated(keep='first')]

    # Create complete hourly range
    full_range = pd.date_range(
        start=group.index.min(),
        end=group.index.max(),
        freq='h'
    )

    # Reindex and interpolate
    resampled = group.reindex(full_range).interpolate(method='time')

    # Add back homeid
    resampled['homeid'] = homeid
    return resampled

# Process groups and reset index properly
final_df = (final_df.groupby('homeid')
           .apply(resample_and_interpolate)
           .reset_index(level=0, drop=True)  # Drop the group index
           .reset_index()  # Convert timestamp index to column
           .rename(columns={'index': 'timestamp_local'}))

# Verify results
print("\nData validation:")
print(f"Total homes: {final_df['homeid'].nunique()}")
print(f"Date range: {final_df['timestamp_local'].min()} to {final_df['timestamp_local'].max()}")
print(f"Shape: {final_df.shape}")

  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='time')
  resampled = group.reindex(full_range).interpolate(method='ti


Data validation:
Total homes: 254
Date range: 2016-09-15 17:00:00 to 2018-07-01 00:00:00
Shape: (1641653, 33)


In [33]:
final_df.head(n=1000000)

Unnamed: 0.1,timestamp_local,Unnamed: 0,sensorid_electric,electric_min_consumption,electric_max_consumption,std_consumption,electric_median_consumption,electric_total_consumption_Wh,homeid,type,...,roomid,status,_room,sensorid_room,measurement,median_value,roomid_room,status_room,income_band_mid,education_map
0,2016-09-20 09:00:00,982.000000,1216.0,0.069000,0.335000,0.033905,0.194000,0.179807,47,hall,...,650,active,983.000000,1200.0,temperature,20.500,652.0,active,0.0,8.0
1,2016-09-20 10:00:00,983.000000,1216.0,0.068875,0.458375,0.035875,0.187625,0.176690,47,,...,,,984.000000,1200.0,,20.475,652.0,,0.0,8.0
2,2016-09-20 11:00:00,984.000000,1216.0,0.068750,0.581750,0.037846,0.181250,0.173574,47,,...,,,985.000000,1200.0,,20.450,652.0,,0.0,8.0
3,2016-09-20 12:00:00,985.000000,1216.0,0.068625,0.705125,0.039817,0.174875,0.170457,47,,...,,,986.000000,1200.0,,20.425,652.0,,0.0,8.0
4,2016-09-20 13:00:00,986.000000,1216.0,0.068500,0.828500,0.041788,0.168500,0.167340,47,,...,,,987.000000,1200.0,,20.400,652.0,,0.0,8.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
999995,2018-04-10 23:00:00,4949.666667,9718.0,0.169333,2.530333,0.403364,0.361333,0.441567,180,,...,,,4943.666667,9696.0,,17.200,1661.0,,8.0,5.0
999996,2018-04-11 00:00:00,4950.555556,9718.0,0.168778,2.322444,0.343371,0.356444,0.421918,180,,...,,,4944.555556,9696.0,,16.900,1661.0,,8.0,5.0
999997,2018-04-11 01:00:00,4951.444444,9718.0,0.168222,2.114556,0.283378,0.351556,0.402268,180,,...,,,4945.444444,9696.0,,16.600,1661.0,,8.0,5.0
999998,2018-04-11 02:00:00,4952.333333,9718.0,0.167667,1.906667,0.223386,0.346667,0.382618,180,,...,,,4946.333333,9696.0,,16.300,1661.0,,8.0,5.0


### Plan for Saving and Loading DataFrame as Parquet
1. Save final DataFrame to parquet
2. Create load function
3. Add data verification

In [52]:
def save_processed_data(df, filename='final_processed_data3.parquet'):
    """Save processed DataFrame to parquet"""
    # Create data directory if it doesn't exist
    if not os.path.exists('processed_data'):
        os.makedirs('processed_data')

    # Save to parquet
    filepath = os.path.join('processed_data', filename)
    df.to_parquet(filepath)
    print(f"Data saved to {filepath}")

    # Print verification info
    print(f"\nSaved data info:")
    print(f"Shape: {df.shape}")
    print(f"Homes: {df['homeid'].nunique()}")
    print(f"Date range: {df['timestamp_local'].min()} to {df['timestamp_local'].max()}")

def load_processed_data(filename='final_processed_data2.parquet'):
    """Load processed data from parquet"""
    filepath = os.path.join('processed_data', filename)
    df = pd.read_parquet(filepath)
    return df

# Save processed DataFrame
save_processed_data(df_merged)

# Later, load the data:
# processed_df = load_processed_data()

Data saved to processed_data/final_processed_data3.parquet

Saved data info:
Shape: (1641653, 23)
Homes: 254
Date range: 2016-09-15 17:00:00 to 2018-07-01 00:00:00
