In [1]:
import dask.dataframe as dd
import matplotlib.pyplot as plt
import os
from sklearn.metrics import r2_score
import lightgbm as lgb
import dask_ml
import dask
import pandas as pd
import numpy as np
#dask.config.set({"distributed.utils.perf.gc-fraction": 0.8})
#dask.config

In [None]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, message=".*Sending large graph.*")


In [None]:
from dask.distributed import Client, LocalCluster
import dask.multiprocessing

# Get the number of available CPU cores
n_cores = 1

cluster = LocalCluster(processes=True,n_workers=1, threads_per_worker=1)
client = Client(cluster)

In [None]:
folders = [
    'train0_25',
    'train25_50',
    'train50_75',
    'train75_100'
]

# Read Parquet files from each folder into Dask DataFrames
dfs = [dd.read_parquet(folder) for folder in folders]

# Concatenate all DataFrames into a single DataFrame
data = dd.concat(dfs)

In [None]:
data.shape[0].compute()

In [None]:
#numPartitions = data.npartitions
#splitPart = int(numPartitions*0.7)
#train = data.partitions[0:splitPart]
#test = data.partitions[splitPart:numPartitions]

In [2]:
feat60 = ['state_t', 'state_q0001','state_q0002','state_q0003','state_u','state_v','pbuf_ozone','pbuf_CH4','pbuf_N2O']

#feat60 = ['state_q0001','state_q0002','state_q0003','state_u','state_v','pbuf_ozone','pbuf_CH4','pbuf_N2O']
feat1 = ['state_ps','pbuf_SOLIN','pbuf_LHFLX','pbuf_SHFLX','pbuf_TAUX','pbuf_TAUY','pbuf_COSZRS','cam_in_ALDIF','cam_in_ALDIR','cam_in_ASDIF','cam_in_ASDIR','cam_in_LWUP','cam_in_ICEFRAC','cam_in_LANDFRAC','cam_in_OCNFRAC','cam_in_SNOWHLAND']

target60 = ['ptend_t','ptend_q0001','ptend_q0002','ptend_q0003','ptend_u','ptend_v']
target1 = ['cam_out_NETSW','cam_out_FLWDS','cam_out_PRECSC','cam_out_PRECC','cam_out_SOLS','cam_out_SOLL','cam_out_SOLSD','cam_out_SOLLD']

features60 = [] 
for f in feat60:
    features60 = features60 + [f+'_'+str(i) for i in range(60)]
allF = features60 + feat1

targets60 = [] 
for f in target60:
    targets60 = targets60 + [f+'_'+str(i) for i in range(60)]
allT = targets60 + target1

targetsToDrop12 = [ 'ptend_q0001', 'ptend_q0002', 'ptend_q0003', 'ptend_u', 'ptend_v']
dropT = ['ptend_q0002_12','ptend_q0002_13','ptend_q0002_14'] # attention, I think i also need to predict _15
for f in targetsToDrop12:
    dropT = dropT + [f+'_'+str(i) for i in range(12)]

allT2 = [i for i in allT if i not in dropT]

# find corrupt files

In [None]:
import os
import dask.dataframe as dd
from dask.distributed import Client

# Optionally, start a Dask client for better error handling and performance
client = Client()

def find_parquet_files(folder):
    """Recursively find all parquet files in a given folder."""
    parquet_files = []
    for root, _, files in os.walk(folder):
        for file in files:
            if file.endswith(".parquet"):
                parquet_files.append(os.path.join(root, file))
    return parquet_files

# Collect all Parquet files from all folders
all_parquet_files = []
all_parquet_files.extend(find_parquet_files(folders[3]))

corrupted_files = []

# Attempt to read each Parquet file individually
for file in all_parquet_files:
    try:
        df = dd.read_parquet(file)
        # Force computation to check for any reading issues
        df.head()
    except Exception as e:
        print(f"Error reading {file}: {e}")
        corrupted_files.append(file)

if corrupted_files:
    print("The following files are corrupted or not Parquet files:")
    for corrupted_file in corrupted_files:
        print(corrupted_file)
else:
    print("No corrupted files found.")


In [None]:
len(corrupted_files)

In [None]:
import os

for file in corrupted_files:
    os.remove(file)


# check out data

In [None]:
import logging

In [None]:
logging.getLogger("distributed.utils_perf").setLevel(logging.ERROR)
data['ptend_q0002_15'].compute().hist(bins=100)

# visualize data
- all variables seem to be somewhat stationary -> no time dependency in data
- we can just downsample data

In [None]:
def createHistPlt(a, f,st):
        #if not os.path.exists('histplots/'+f+st+'.jpg'):
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))   

        # Histogram
        ax1.hist(a, bins=100, edgecolor='k', alpha=0.7)
        #ax1.set_xlabel(f)
        ax1.set_ylabel('Frequency')
        ax1.set_title('Histogram of '+f)    

        # Time Series
        ax2.scatter(a.index, a[f], s=1, alpha=0.7)
        ax2.set_xlabel('index')
        ax2.set_ylabel(f)
        m = a.mean()
        s = a.std()
        ax2.set_title('mean'+str(round(m,2)) + ' std '+str(round(s,2)))

        # Adjust layout
        #plt.tight_layout()
        #plt.show()
        #fig.savefig('histplots/'+f+st+'.jpg')
        #else:
        #print('skipped',f)
def createHist(a,f,st):
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))   
    ax1.hist(a, bins=100, edgecolor='k', alpha=0.7)
    #ax1.set_xlabel(f)
    ax1.set_ylabel('Frequency')
    ax1.set_title('Histogram of '+f)  
    m = a.mean()
    s = a.std()
    b = (a - a.mean())/a.std()
    ax2.hist(a, bins=100, edgecolor='k', alpha=0.7)
    ax2.set_xlabel(f)
    ax2.set_ylabel('Frequency')
    ax2.set_title('mean'+str(round(m,2)) + ' std '+str(round(s,2)))
    fig.savefig('histplots/'+f+st+'.jpg')

In [None]:
# temperature: distribution moves from left to right
for f in allF:
    if not os.path.exists('histplots/'+f+'feat'+'.jpg'):
        a = data[f].compute().reset_index()
        createHistPlt(a,f, 'feat')
for f in allT2:
    if not os.path.exists('histplots/'+f+'targ'+'.jpg'):
        a = data[f].compute().reset_index()
        createHistPlt(a,f, 'targ')

In [None]:
f = 'ptend_q0002_26'
a = data[f].compute().reset_index()


In [None]:
s = (data['ptend_q0002_26']*1200+data['state_q0001_26']).compute().reset_index()

In [None]:
max(a[f])-min(a[f]), max(a[f]), min(a[f])

In [None]:
(a[f]-min(a[f])) /(max(a[f])-min(a[f]))

In [None]:
#a[f]
#a.loc[a[f]>-1e-11][f]
b = a.copy()
b[f] = -a[f]*1e10 #(a[f]-min(a[f])) /(max(a[f])-min(a[f]))
#b[f] = np.exp(b[f])
fig, (ax1) = plt.subplots(1, 1, figsize=(12, 8))   
bins = ax1.hist(s[0], bins=1000, edgecolor='k', alpha=0.7)

In [None]:
bins

In [None]:
b = a
#[f] = np.log(1+a[f]) / np.log(1+max(a[f]))
b[f] = a[f]/min(a[f])
createHistPlt(b,f, 'targ')

In [None]:
import plotly.express as px

fig = px.histogram(a.sample(frac=0.5), x=f,histfunc='avg')
fig.show()

# downsample data

In [None]:
import numpy as np
np.random.seed(42)

orig_partitions = [i for i in range(0,int(data.npartitions))]
np.random.shuffle(orig_partitions) #shuffles inplace

trainSep = int(0.1* data.npartitions)
valEnd = int(0.015* data.npartitions) + trainSep

sampledPartIdxTrain = orig_partitions[0:trainSep]
sampledPartIdxTest  = orig_partitions[trainSep:valEnd]

In [None]:
len(sampledPartIdxTest), len(sampledPartIdxTrain)

In [None]:
import sys
size_in_bytes = sys.getsizeof(X_val)
print('in mb',size_in_bytes/1000/1000) 
#del a

# baseline approach (LGBM)
- 0.45 public score (without temp data & not shuffled between train/test)
- 0.47 public score (with temp data & shuffled properly 2*100 partitions)
- room for more improvement  (feature engineering)

In [None]:
def convertData(partIdStart, partIdEnd, train, featuresTrain, targetFeatures, mean_values, std_values):
    X = train[featuresTrain].partitions[partIdStart:partIdEnd].compute()
    y = train[targetFeatures].partitions[partIdStart:partIdEnd].compute()
    # normalize
    for f in featuresTrain:
        X[f] = (X[f] - mean_values[f]) / std_values[f]
    for f in targetFeatures:
        y[f] = (y[f] - mean_values[f]) / std_values[f]

    return X,y

def convertDataBack(y, pred, feature, mean_values, std_values):
    cy = y*std_values[feature] + mean_values[feature]
    cpred = pred*std_values[feature] + mean_values[feature]
    return cy, cpred

def calcR2scoreFromConvData(y, pred, feature, mean_values, std_values):
    cy, cpred = convertDataBack(y, pred, feature, mean_values, std_values)
    return r2_score(cy, cpred)

In [None]:
# 8 partitions = 260mb
import pandas as pd
valList = []
for i in sampledPartIdxTest:
    valList.append(data.get_partition(int(i)).compute())
val = pd.concat(valList)
X_val = val[allF]
y_val = val[allT2]
del val, valList

In [None]:
params = {
    'boosting_type': 'gbdt',
    'objective': 'regression',
    'metric': 'l2',
    'num_leaves': 15,
    #'learning_rate': 0.05,
    #'feature_fraction': 0.9,
    #'bagging_fraction': 0.8,
    #'bagging_freq': 5,
    'verbose': -1
}



r2ScoreDict = {}
r2ScoreDict = {f: {} for f in allT2}

partPerLoop = 100
nsplitData = int(len(sampledPartIdxTrain)/partPerLoop)
for i in range(nsplitData):
    startPartIdx = i*partPerLoop
    dlist = []
    for j in range(partPerLoop):
        dlist.append(data.get_partition(int(sampledPartIdxTrain[startPartIdx+j])).compute())
    locdata = pd.concat(dlist)
    print('done preprocessing data')
    X = locdata[allF]

    for f in allT2:
        print('processing ',f)
        fileName = 'individualLGBMs/model_'+f+'.txt'
        gbm = lgb.Booster(model_file=fileName) if i != 0 else None

        valSet = lgb.Dataset(X_val, label=y_val[f], free_raw_data=False)
        y = locdata[f]
        train_set = lgb.Dataset(X, y, free_raw_data=False)
        gbm = lgb.train(params,
                    train_set,
                    num_boost_round=20, 
                    valid_sets=valSet,
                    init_model=gbm)
        
        predTrain = gbm.predict(X)
        predVal = gbm.predict(X_val)
        r2train =r2_score(train_set.label, predTrain)
        r2test =r2_score(valSet.label, predVal)
        r2ScoreDict[f][i] = {'train':r2train,'test':r2test}
        print('r2 scores', r2train,r2test)

        gbm.save_model(fileName)
        gbm.save_model('individualLGBMs/checkpoints/model_'+f+'_'+str(i)+'_'+str(round(r2test,3))+'.txt')
        del y, train_set, valSet#, predTrain, predVal


In [None]:
del X, locdata

In [None]:
for f in allT2:
    diff = r2ScoreDict[f][1]['test'] - r2ScoreDict[f][0]['test']
    if diff < 0:
        print(f,r2ScoreDict[f])

In [None]:
import pickle

with open('individualLGBMs/r2ScoreDict.p', 'wb') as fp:
    pickle.dump(r2ScoreDict, fp, protocol=pickle.HIGHEST_PROTOCOL)

## more validation testing 

In [None]:
for f in allT2:
    print('processing ',f)
    fileName = 'individualLGBMs/model_'+f+'.txt'
    gbm = lgb.Booster(model_file=fileName) if i != 0 else None
    predVal = gbm.predict(X_val)
    
    r2test =r2_score(y_val[f], predVal)
    r2ScoreDict[f][2] = {'test':r2test}
    print('r2 scores', r2test)

In [None]:
del X_val, y_val

## submitting

In [3]:
testData = dd.read_parquet("test")
sampleSubmissions = dd.read_parquet('sampleSub')

In [4]:
testList = []
for i in range(testData.npartitions):
    testList.append(testData.get_partition(int(i)).compute())
test = pd.concat(testList)
X_test = test[allF]

sampSubList = []
for i in range(sampleSubmissions.npartitions):
    sampSubList.append(sampleSubmissions.get_partition(int(i)).compute())
sampleSub = pd.concat(sampSubList)

: 

In [None]:
for f in allT2:
    print('processing ',f)
    fileName = 'individualLGBMs/model_'+f+'.txt'
    gbm = lgb.Booster(model_file=fileName)
    predVal = gbm.predict(X_test)
    
    sampleSub[f] = sampleSub[f] * predVal

In [None]:
sampleSub.to_parquet('sample_sub_LGBM_baseline2_shuff_allTrainF.parquet')

In [None]:
del sampleSub, testData,sampleSubmissions, X_test, sampSubList, testList