In [1]:
!pip install pyspark

[0m

In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, expr


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

from pyspark.sql.functions import col, lower, expr

def lower(df, table_config):
    df_lower = df.select([col(c).alias(c.lower()) for c in df.columns])
    actual_columns=df_lower.columns
    actual_columns.sort()
    expected_cols = list(map(str.lower, table_config['columns']))
    expected_cols.sort()
    #return actual_columns, expected_cols
    if len(actual_columns) == len(expected_cols) and list(actual_columns) == expected_cols:
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(actual_columns).difference(expected_cols))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_cols).difference(actual_columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {actual_columns}')
        logging.info(f'expected columns: {expected_cols}')
        return 0

Overwriting testutility.py


In [3]:
%%writefile file.yaml
file_type: csv
dataset_name: UserAchievements
file_name: UserAchievements
table_name: achievements
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - id
    - userid
    - achievementtype
    - tier
    - tierachievementdate
    - points
    - currentranking
    - highestranking
    - totalgold
    - totalsilver
    - totalbronze

Overwriting file.yaml


In [4]:
# Read config file
from pyspark.sql import SparkSession, functions as f
import testutility as util

config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'UserAchievements',
 'file_name': 'UserAchievements',
 'table_name': 'achievements',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['id',
  'userid',
  'achievementtype',
  'tier',
  'tierachievementdate',
  'points',
  'currentranking',
  'highestranking',
  'totalgold',
  'totalsilver',
  'totalbronze']}

In [5]:
#!pip install pyspark
from pyspark.sql import SparkSession, functions as f

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option('header', True).option('ignoreLeadingWhiteSpace',True).option('ignoreTrailingWhiteSpace',True).csv('/kaggle/input/meta-kaggle/UserAchievements.csv')
df.show()
df.head(10)                

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/10 10:25:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-------+------+---------------+----+-------------------+------+--------------+--------------+---------+-----------+-----------+
|     Id|UserId|AchievementType|Tier|TierAchievementDate|Points|CurrentRanking|HighestRanking|TotalGold|TotalSilver|TotalBronze|
+-------+------+---------------+----+-------------------+------+--------------+--------------+---------+-----------+-----------+
|3739822|     1|     Discussion|   1|         11/06/2019|     0|          null|             3|        0|          0|         14|
|3916402|     1|   Competitions|   1|         11/06/2019|     0|          null|          null|        0|          0|          0|
|3739823|   368|   Competitions|   1|         07/15/2016|   130|          null|            75|        0|          0|          0|
|3739824|   368|        Scripts|   2|         09/21/2016|    38|          2639|    

[Row(Id='3739822', UserId='1', AchievementType='Discussion', Tier='1', TierAchievementDate='11/06/2019', Points='0', CurrentRanking=None, HighestRanking='3', TotalGold='0', TotalSilver='0', TotalBronze='14'),
 Row(Id='3916402', UserId='1', AchievementType='Competitions', Tier='1', TierAchievementDate='11/06/2019', Points='0', CurrentRanking=None, HighestRanking=None, TotalGold='0', TotalSilver='0', TotalBronze='0'),
 Row(Id='3739823', UserId='368', AchievementType='Competitions', Tier='1', TierAchievementDate='07/15/2016', Points='130', CurrentRanking=None, HighestRanking='75', TotalGold='0', TotalSilver='0', TotalBronze='0'),
 Row(Id='3739824', UserId='368', AchievementType='Scripts', Tier='2', TierAchievementDate='09/21/2016', Points='38', CurrentRanking='2639', HighestRanking='2005', TotalGold='0', TotalSilver='3', TotalBronze='14'),
 Row(Id='3739825', UserId='368', AchievementType='Discussion', Tier='2', TierAchievementDate='08/30/2016', Points='143', CurrentRanking='532', HighestR

In [6]:
# read the file using config file
file_type = config_data['file_type']
source_file = '/kaggle/input/meta-kaggle/' + config_data['file_name'] + f'.{file_type}'
print(source_file)

/kaggle/input/meta-kaggle/UserAchievements.csv


In [7]:
#print("",source_file)
#!pip install pyspark
from pyspark.sql import SparkSession, functions as f

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
df = spark.read.option('header', True).option('ignoreLeadingWhiteSpace',True).option('ignoreTrailingWhiteSpace',True).csv(source_file)
df.show()
df.head(10) 

+-------+------+---------------+----+-------------------+------+--------------+--------------+---------+-----------+-----------+
|     Id|UserId|AchievementType|Tier|TierAchievementDate|Points|CurrentRanking|HighestRanking|TotalGold|TotalSilver|TotalBronze|
+-------+------+---------------+----+-------------------+------+--------------+--------------+---------+-----------+-----------+
|3739822|     1|     Discussion|   1|         11/06/2019|     0|          null|             3|        0|          0|         14|
|3916402|     1|   Competitions|   1|         11/06/2019|     0|          null|          null|        0|          0|          0|
|3739823|   368|   Competitions|   1|         07/15/2016|   130|          null|            75|        0|          0|          0|
|3739824|   368|        Scripts|   2|         09/21/2016|    38|          2639|          2005|        0|          3|         14|
|3739825|   368|     Discussion|   2|         08/30/2016|   143|           532|           319|   

[Row(Id='3739822', UserId='1', AchievementType='Discussion', Tier='1', TierAchievementDate='11/06/2019', Points='0', CurrentRanking=None, HighestRanking='3', TotalGold='0', TotalSilver='0', TotalBronze='14'),
 Row(Id='3916402', UserId='1', AchievementType='Competitions', Tier='1', TierAchievementDate='11/06/2019', Points='0', CurrentRanking=None, HighestRanking=None, TotalGold='0', TotalSilver='0', TotalBronze='0'),
 Row(Id='3739823', UserId='368', AchievementType='Competitions', Tier='1', TierAchievementDate='07/15/2016', Points='130', CurrentRanking=None, HighestRanking='75', TotalGold='0', TotalSilver='0', TotalBronze='0'),
 Row(Id='3739824', UserId='368', AchievementType='Scripts', Tier='2', TierAchievementDate='09/21/2016', Points='38', CurrentRanking='2639', HighestRanking='2005', TotalGold='0', TotalSilver='3', TotalBronze='14'),
 Row(Id='3739825', UserId='368', AchievementType='Discussion', Tier='2', TierAchievementDate='08/30/2016', Points='143', CurrentRanking='532', HighestR

In [22]:
#validate the header of the file
import testutility as util

if util.lower(df, config_data)==0:
    print("validation failed")
else:
    print("col validation passed")
    #pandas_df = df.toPandas()
    comp_file = '/kaggle/working/' + 'compressed_'+ config_data['file_name']
    df.write.format("csv").option("delimiter", "|").option("compression", "gzip").save(comp_file)

column name and column length validation passed
col validation passed


                                                                                

In [23]:
ls

__notebook_source__.ipynb  [0m[01;34mcompressed_UserAchievements[0m/  testutility.py
[01;34m__pycache__[0m/               file.yaml


In [25]:
cd compressed_UserAchievements/

/kaggle/working/compressed_UserAchievements


In [27]:
ls

_SUCCESS
part-00000-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00001-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00002-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00003-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00004-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00005-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00006-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00007-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00008-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00009-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00010-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00011-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00012-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00013-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00014-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00015-5c05fb15-9473-4f50-acdf-467842f2a106-c000.csv.gz
part-00016-5c05fb15-9473-4f50-a

In [30]:
import zipfile
import os
from IPython.display import FileLink

def zip_dir(directory = os.curdir, file_name = 'directory.zip'):
    """
    zip all the files in a directory
    
    Parameters
    _____
    directory: str
        directory needs to be zipped, defualt is current working directory
        
    file_name: str
        the name of the zipped file (including .zip), default is 'directory.zip'
        
    Returns
    _____
    Creates a hyperlink, which can be used to download the zip file)
    """
    os.chdir(directory)
    zip_ref = zipfile.ZipFile(file_name, mode='w')
    for folder, _, files in os.walk(directory):
        for file in files:
            if file_name in file:
                pass
            else:
                zip_ref.write(os.path.join(folder, file))

    return FileLink(file_name)

In [35]:
zip_dir()