In [0]:
#Importing the libraries
import requests
from pyspark.sql import DataFrame

In [0]:
#Creating a function to extract data from URL
#Loading the file in dbfs 
def get_data(url:str):
    filename = url.split('/')[-1]
    with requests.get(url, stream = True) as r:
        with open('/dbfs/{}'.format(filename),'wb') as f:
            for chunk in r.iter_content(chunk_size = 8192):
                f.write(chunk)

    return filename

In [0]:
file_name = get_data('https://datahub.io/core/glacier-mass-balance/r/glaciers.csv')

In [0]:
file_format = file_name.split('.')[-1]

In [0]:
#Reading the file from dbfs
def read_data(file_name):
    if file_format == 'csv':
        df = spark.read.format(file_format).option('header','true').load('file:/dbfs/{}'.format(file_name))
    
    return df

In [0]:
df = read_data(file_name)

In [0]:
df.createOrReplaceTempView('df')

In [0]:
# Performing the transformation operations
# Spliting the data into 2 different dataframes
def transform_data(df: DataFrame):
    spark.sql("create or replace temp view nintys as select * from df where Year like '19%' order by Year asc;")
    nintys_df = spark.sql('select * from nintys')
    spark.sql("create or replace temp view modern as select * from df where Year like '20%' order by Year asc;")
    modern_df = spark.sql('select * from modern')

    return nintys_df,modern_df

In [0]:
x,y = transform_data(df)

In [0]:
# Loading operations
# Creating the file name 
def create_file_names():
    nintys_file_namez = spark.sql("(select * from nintys order by Year asc limit 1) union (select * from nintys order by Year desc limit 1)")
    modern_file_namez = spark.sql("(select * from modern order by Year asc limit 1) union (select * from modern order by Year desc limit 1)")
    nintys_file_namez_df = nintys_file_namez.collect()
    modern_file_namez_df = modern_file_namez.collect()
    nintys_file_name = nintys_file_namez_df[0].__getitem__('Year')+"-"+nintys_file_namez_df[0].__getitem__('Year')
    modern_file_name = modern_file_namez_df[0].__getitem__('Year')+"-"+modern_file_namez_df[0].__getitem__('Year')

    return nintys_file_name,modern_file_name

In [0]:
m,n = create_file_names()

In [0]:
#Loading the files into dbfs
def write_df(file_type: str,dfs,file_names):
    for x,y in zip(dfs,file_names):
        m = x.write.mode('overwrite').format(file_type).save("/dbfs/{}.{}".format(y,file_type))
    return m

In [0]:
write_df('csv',[x,y],[m,n])

In [0]:
display(x)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
display(y)

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2001,-18.032,37
2002,-18.726,37
2003,-19.984,37
2004,-20.703,37
2005,-21.405,37
2006,-22.595,37
2007,-23.255,37
2008,-23.776,37
2009,-24.459,37


In [0]:
# Displaying files after loading in dbfs
# Final output files
dbutils.fs.ls('/dbfs')

Out[50]: [FileInfo(path='dbfs:/dbfs/1945-1945.csv/', name='1945-1945.csv/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/2000-2000.csv/', name='2000-2000.csv/', size=0, modificationTime=0)]