In [52]:
import pandas as pd
import numpy
from google.cloud import storage
import gcsfs

fs = gcsfs.GCSFileSystem(project='tron-argolis', token='/home/sockcop/k/k.json')

with fs.open('gs://tron-argolis-dataset/kagglev2-may-2016.csv') as f:
    gcs_df = pd.read_csv(f)

gcs_df.head()

Unnamed: 0,PatientId,AppointmentID,Gender,ScheduledDay,AppointmentDay,Age,Neighbourhood,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received,No-show
0,29872500000000.0,5642903,F,2016-04-29T18:38:08Z,2016-04-29T00:00:00Z,62,JARDIM DA PENHA,0,1,0,0,0,0,No
1,558997800000000.0,5642503,M,2016-04-29T16:08:27Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,0,0,0,0,0,No
2,4262962000000.0,5642549,F,2016-04-29T16:19:04Z,2016-04-29T00:00:00Z,62,MATA DA PRAIA,0,0,0,0,0,0,No
3,867951200000.0,5642828,F,2016-04-29T17:29:31Z,2016-04-29T00:00:00Z,8,PONTAL DE CAMBURI,0,0,0,0,0,0,No
4,8841186000000.0,5642494,F,2016-04-29T16:07:23Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,1,1,0,0,0,No


In [53]:
print(gcs_df['No-show'] == 'Yes')
gcs_df['output_label'] = (gcs_df['No-show'] == 'Yes').astype('int')

0         False
1         False
2         False
3         False
4         False
          ...  
110522    False
110523    False
110524    False
110525    False
110526    False
Name: No-show, Length: 110527, dtype: bool


In [57]:
print(gcs_df['ScheduledDay'])
gcs_df['ScheduledDay'] = pd.to_datetime(gcs_df['ScheduledDay'], format='%Y-%m-%dT%H-%M-%SZ', errors='coerce')
print(gcs_df['ScheduledDay'])
assert gcs_df.ScheduledDay.isnull().sum() == 0, 'missing ScheduledDay dates'

0        NaT
1        NaT
2        NaT
3        NaT
4        NaT
          ..
110522   NaT
110523   NaT
110524   NaT
110525   NaT
110526   NaT
Name: ScheduledDay, Length: 110527, dtype: datetime64[ns]
0        NaT
1        NaT
2        NaT
3        NaT
4        NaT
          ..
110522   NaT
110523   NaT
110524   NaT
110525   NaT
110526   NaT
Name: ScheduledDay, Length: 110527, dtype: datetime64[ns]


AssertionError: missing ScheduledDay dates

In [42]:
def calc_prev(y):
    return (sum(y)/len(y))

calc_prev(gcs_df.output_label.values)

0.20193255946510807

In [44]:
print((gcs_df['ScheduledDay']>gcs_df['AppointmentDay']).sum())

38568


In [59]:
import kfp
from kfp.v2.dsl import component
import os

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/sockcop/k/k.json'
client = kfp.Client(host='https://11f0f736521255cb-dot-us-central1.pipelines.googleusercontent.com')

@component(packages_to_install=['gcsfs', 'pandas', 'numpy'])
def data_preprocess(dataset: str):
    import pandas as pd
    import numpy

    df = pd.read_csv(dataset)
    df['output_label'] = (df['No-show'] == 'Yes').astype('int')
    df['ScheduledDay'] = pd.to_datetime(df['ScheduledDay'], 
                                          format = '%Y-%m-%dT%H:%M:%SZ', 
                                          errors = 'coerce') 
    df['AppointmentDay'] = pd.to_datetime(df['AppointmentDay'], 
                                          format = '%Y-%m-%dT%H:%M:%SZ', 
                                          errors = 'coerce')
    
    print(df['ScheduledDay'])

    assert df.ScheduledDay.isnull().sum() == 0, 'missing ScheduledDay dates'
    assert df.AppointmentDay.isnull().sum() == 0, 'missing AppointmentDay dates'
    
    df['AppointmentDay'] = df['AppointmentDay'] +pd.Timedelta('1d') - pd.Timedelta('1s')
    
    df['ScheduledDay'] = df['ScheduledDay'].dt.year
    df['ScheduledDay_month'] = df['ScheduledDay'].dt.month
    df['ScheduledDay_week'] = df['ScheduledDay'].dt.week
    df['ScheduledDay_day'] = df['ScheduledDay'].dt.day
    df['ScheduledDay_hour'] = df['ScheduledDay'].dt.hour
    df['ScheduledDay_minute'] = df['ScheduledDay'].dt.minute
    df['ScheduledDay_dayofweek'] = df['ScheduledDay'].dt.dayofweek
    df['AppointmentDay_year'] = df['AppointmentDay'].dt.year
    df['AppointmentDay_month'] = df['AppointmentDay'].dt.month
    df['AppointmentDay_week'] = df['AppointmentDay'].dt.week
    df['AppointmentDay_day'] = df['AppointmentDay'].dt.day
    df['AppointmentDay_hour'] = df['AppointmentDay'].dt.hour
    df['AppointmentDay_minute'] = df['AppointmentDay'].dt.minute
    df['AppointmentDay_dayofweek'] = df['AppointmentDay'].dt.dayofweek
    
    df = df.sample(n = len(df), random_state = 42)
    df = df.reset_index(drop = True)
    
    df_valid = df.sample(frac = 0.3, random_state = 42)
    df_train = df.drop(df_valid.index)
    
    print('Valid prevalence(n = %d):%.3f'%(len(df_valid),calc_prevalence(df_valid.output_label.values)))
    print('Train prevalence(n = %d):%.3f'%(len(df_train), calc_prevalence(df_train.output_label.values)))
    
@kfp.dsl.pipeline(name='operation')
def pipeline_op():
    test_task = data_preprocess('gs://tron-argolis-dataset/kagglev2-may-2016.csv')

In [60]:
client.create_run_from_pipeline_func(pipeline_op, 
                                     arguments={}, 
                                     mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)



RunPipelineResult(run_id=2e9da284-eda9-4c40-aaf2-ebe8cb14c2fd)

In [27]:
!cat /home/sockcop/k/k.json

{
  "type": "service_account",
  "project_id": "tron-argolis",
  "private_key_id": "a83412b280c7d36b41205ba36655c237c148f105",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCw41dYNmtScfA0\nl/CqyvBSbNY9oR0+j5n0GF9xwPPD0xRyTfrpizPMWyXGpYiYbTeQGsKh1VHZUT8j\niK1NdaeszNKYokZl2vlkEyZizdmLQPUmToVAyfagXz3bBPo5XODQ4rCYMuULGVUQ\nYhN8sX26ck9c0/2PEZKyhPAxY9rvA4+CXmru5Bxmb+zxEU+Vt4Ct+H3ZQEQwsMsB\ndeKRLEatg6DgV6HRfRI85IfaGPxL8n2bKHAZPsUkDMzJ57tha2LsgKgImlgmjv8V\nZDQISqlJNmynjvyxVO63/P+kpKA7MfvDaQamRBmopo0ZuXT2TBuFPAv5MWelcfz9\n6YZJ+0txAgMBAAECggEANIZlZtPN/Y7IwY1GkSiuwQMBgQ5o9S1GDWX5XXlqyQDS\nRvjh41yK9okwR/Lag0yXHarclZqW8d1+zqnksCYaMqUled5h4hfqSy2mjdtLWF7j\nDMtvJSRzn/54CyPIu6TZOx29S4x9V9TfXfyJhdLcnzMXXtyyI7wXn/v6qOfaWHNq\n2HvqJKayEx/Zzf0OY5Ts60jcz7F3acWSlMlw2aL8V1Bm1uIdDw65oTuJa8dX+xBb\nr+/O//9u3m50hUNei34SJK2KPFVgBYImd5se+uiJ1h3tdfZBeVgFV8zlBhpokEPS\nTmoXQXXOReP32gMC/qMaN0smWfMhPWPKS3kqOBhZYwKBgQDoa5M0pcu3qM6vmlid\nxDwDtbgotrSPpReMOEqLLkwXKWk7BtYwHX