In [10]:
import happybase
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, avg, date_format, current_timestamp, round as spark_round

# --- 1. KONFIGURACJA ---
HBASE_HOST = 'localhost'
HBASE_TABLE = 'air_quality_metrics'
HIVE_RAW_TABLE = 'air_quality'

# --- 2. FUNKCJE HBASE ---
def recreate_table():
    """Usuwa tabelę jeśli istnieje i tworzy nową (Clean Slate)."""
    try:
        conn = happybase.Connection(HBASE_HOST)
        tables = conn.tables()
        
        if HBASE_TABLE.encode() in tables:
            print(f"Tabela '{HBASE_TABLE}' już istnieje. Usuwam starą wersję...")
            if conn.is_table_enabled(HBASE_TABLE):
                conn.disable_table(HBASE_TABLE)
            conn.delete_table(HBASE_TABLE)
        
        print(f"Tworzę nową, czystą tabelę HBase: {HBASE_TABLE}")
        conn.create_table(HBASE_TABLE, {'stats': dict(), 'meta': dict()})
        conn.close()
    except Exception as e:
        print(f"Błąd krytyczny przy tworzeniu tabeli: {e}")

def save_partition_to_hbase(iterator):
    try:
        conn = happybase.Connection(HBASE_HOST)
        table = conn.table(HBASE_TABLE)
        batch = table.batch()
        
        for row in iterator:
            row_key = f"{row.location_name}#{row.data_pomiaru}"
            
            data = {
                b'meta:location': str(row.location_name).encode('utf-8'),
                b'meta:date': str(row.data_pomiaru).encode('utf-8'),
                b'meta:processed_at': str(row.processed_at).encode('utf-8')
            }
            
            row_dict = row.asDict()
            meta_cols = ['location_name', 'data_pomiaru', 'processed_at']
            
            for col_name, value in row_dict.items():
                if col_name not in meta_cols and value is not None:
                    hb_col = f'stats:{col_name}'.encode('utf-8')
                    hb_val = str(value).encode('utf-8')
                    data[hb_col] = hb_val
            
            batch.put(row_key, data)
        
        batch.send()
        conn.close()
    except Exception as e:
        print(f"Błąd zapisu w partycji: {e}")

# --- 3. LOGIKA SPARKA ---
spark = SparkSession.builder \
    .appName("AirQuality_Full_Pivot_Reload") \
    .enableHiveSupport() \
    .getOrCreate()

print("1. Pobieranie danych z Hive...")
df_raw = spark.table(HIVE_RAW_TABLE)

print("2. Czyszczenie danych...")
# A. Parsowanie daty
df_parsed = df_raw.withColumn("ts_parsed", to_timestamp(col("datetimelocal"), "dd-MM-yyyy HH:mm:ss"))

# B. Filtrowanie: Tylko od 1 GRUDNIA 2025
print("   -> Filtruję dane od 2025-12-01...")
df_filtered = df_parsed.filter(col("ts_parsed").isNotNull()) \
                       .filter(col("ts_parsed") >= "2025-12-01") # <--- ZMIANA DATY

# C. Usuwanie duplikatów (Stacja + Czas + Parametr)
df_cleaned = df_filtered.dropDuplicates(['location_name', 'datetimelocal', 'parameter'])

# Formatowanie daty do grupowania (YYYY-MM-DD)
df_final = df_cleaned.withColumn("data_pomiaru", date_format(col("ts_parsed"), "yyyy-MM-dd"))

print("3. Agregacja (Pivot)...")
pivot_df = df_final.groupBy("location_name", "data_pomiaru") \
    .pivot("parameter") \
    .agg(spark_round(avg("value"), 2)) \
    .withColumn("processed_at", current_timestamp())

print("   Powstałe kolumny:")
pivot_df.printSchema()
print(f"   Liczba wierszy (Dzień x Stacja) do wstawienia: {pivot_df.count()}")

print("4. Resetowanie tabeli HBase...")
recreate_table()

print("5. Zapisywanie do HBase...")
pivot_df.foreachPartition(save_partition_to_hbase)

print("--- SUKCES! Załadowano dane od 1 grudnia 2025 ---")

1. Pobieranie danych z Hive...
2. Czyszczenie danych...
   -> Filtruję dane od 2025-12-01...
3. Agregacja (Pivot)...


                                                                                

   Powstałe kolumny:
root
 |-- location_name: string (nullable = true)
 |-- data_pomiaru: string (nullable = true)
 |-- pm1: double (nullable = true)
 |-- pm25: double (nullable = true)
 |-- relativehumidity: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- um003: double (nullable = true)
 |-- processed_at: timestamp (nullable = false)



                                                                                

   Liczba wierszy (Dzień x Stacja) do wstawienia: 195
4. Resetowanie tabeli HBase...
Tabela 'air_quality_metrics' już istnieje. Usuwam starą wersję...
Tworzę nową, czystą tabelę HBase: air_quality_metrics
5. Zapisywanie do HBase...




--- SUKCES! Załadowano dane od 1 grudnia 2025 ---


                                                                                

In [13]:
# Szybki test odczytu
conn = happybase.Connection('localhost')
table = conn.table('air_quality_metrics')

print("Pobieram 5 przykładowych wierszy z HBase:")
for key, data in table.scan(limit=5):
    print(f"Klucz: {key.decode()}")
    print(f"   Dane: {data}")
conn.close()

Pobieram 5 przykładowych wierszy z HBase:
Klucz: Belgradzka#2025-12-01
   Dane: {b'meta:date': b'2025-12-01', b'meta:location': b'Belgradzka', b'meta:processed_at': b'2026-01-09 23:10:52.732000', b'stats:pm1': b'25.6', b'stats:pm25': b'46.7', b'stats:relativehumidity': b'60.3', b'stats:temperature': b'9.56', b'stats:um003': b'4880.0'}
Klucz: Belgradzka#2025-12-02
   Dane: {b'meta:date': b'2025-12-02', b'meta:location': b'Belgradzka', b'meta:processed_at': b'2026-01-09 23:10:52.732000', b'stats:pm1': b'18.8', b'stats:pm25': b'34.9', b'stats:relativehumidity': b'62.8', b'stats:temperature': b'8.91', b'stats:um003': b'3230.0'}
Klucz: Belgradzka#2025-12-03
   Dane: {b'meta:date': b'2025-12-03', b'meta:location': b'Belgradzka', b'meta:processed_at': b'2026-01-09 23:10:52.732000', b'stats:pm1': b'17.4', b'stats:pm25': b'33.0', b'stats:relativehumidity': b'61.1', b'stats:temperature': b'10.0', b'stats:um003': b'3030.0'}
Klucz: Belgradzka#2025-12-04
   Dane: {b'meta:date': b'2025-12-04', b'met

26/01/09 23:13:58 WARN util.JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-914d484d-16c7-4dca-8b1c-5a53da5373ec. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-914d484d-16c7-4dca-8b1c-5a53da5373ec
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:171)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:110)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:91)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1141)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:182)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:178)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.fo