In [1]:
# For PySpark
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import (StructField, StructType, StringType, IntegerType, FloatType, DateType)
from pyspark.sql.functions import lag, col, concat, lit
from pyspark.sql.functions import *

# For Time Logging
from contextlib import contextmanager
import logging
import time
import datetime

# For error checks
import sys

# For randomizing queries
import random

# Print Function name
import inspect

In [2]:
# Setup
file_loc = "/dbfs"
file_prefix = "/mnt/blob/datasets/"
file_name = "dataset_10MB"

debug = 0
randomize = 1
numBenchmarkRuns = 5 # 5 is the default

# 1 for Max
# 2 for Nikhil
machineID = 1

# Total Number of Queries = 36
numQueries = 36

In [3]:
# Auto calculations (DO NOT CHANGE)
# Make the directories if they do not exist
dbutils.fs.mkdirs("mnt/blob/timelogs")
# dbutils.fs.mkdirs("mnt/blob/datasets") # Should exist since we have already created the dataset

now = datetime.datetime.now()
start_of_run  = now.strftime("%Y%m%d_%H%M")
logfilename = "/mnt/blob/timelogs/PySpark_" + file_name + "_" + start_of_run + ".csv"
logfilename_withDBFS = "/dbfs" + logfilename

In [4]:
@contextmanager
def time_usage(runID, name=""):
    """log the time usage in a code block
    prefix: the prefix text to show
    """
    #print ("In time_usage runID = {}".format(runID))
    start = time.time()
    yield
    end = time.time()
    elapsed_seconds = float("%.10f" % (end - start))
    logging.info('%s: elapsed seconds: %s', name, elapsed_seconds)
    output = "\n" + str(runID) + "," + name + "," + str(elapsed_seconds)
       
    # https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html
    # https://stackoverflow.com/questions/49318402/read-write-single-file-in-databricks
    with open(logfilename_withDBFS,"a") as file:
      file.write("\n" + "PySpark" + "," 
                 + str(randomize) + "," 
                 + file_name + "," 
                 + str(machineID) + "," 
                 + str(runID) + "," 
                 + name + "," 
                 + str(elapsed_seconds))

In [5]:
#logging.getLogger().setLevel(logging.INFO)
logging.getLogger().setLevel(logging.WARNING)

In [6]:
print ("Loading Data")
data = spark.read.csv(file_prefix + file_name + ".csv", inferSchema = True, header=True)
data_join = data.limit(int(data.count()/2))
data_100 = spark.read.csv(file_prefix + file_name + "_add_100.csv", inferSchema = True, header=True)
data_1000 = spark.read.csv(file_prefix + file_name + "_add_1000.csv", inferSchema = True, header=True)
data_10000 = spark.read.csv(file_prefix + file_name + "_add_10000.csv", inferSchema = True, header=True)

In [7]:
# testing
data_join.count()

In [8]:
if data_100.count() != 100:
  sys.exit("Num Rows Expected was 100. Got something else.")
if data_1000.count() != 1000:
  sys.exit("Num Rows Expected was 1000. Got something else.")
if data_10000.count() != 10000:
  sys.exit("Num Rows Expected was 10000. Got something else.")

In [9]:
data.columns

In [10]:
# https://jaxenter.com/implement-switch-case-statement-python-138315.html

def query1(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # filtering rows based on a condition
  with time_usage(runID,"Row Operation, Filter"):
    temp = data.filter((data['int17'] > 200) & (data['float17'] < 200)).collect()
  if debug >= 1:
    print(temp[0])

def query2(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # Filtering rows based on regular expressions
  # expr = "^troubleshot"
  expr = "^overmeddled"
  with time_usage(runID,"Row Operation, Filter Reg Ex 1"):
    temp = data.filter(data["Group0"].rlike(expr)).collect()  

def query3(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # lf at 3rd and 4th position and ending with w
  expr = ".{2}lf.*s$" 
  with time_usage(runID,"Row Operation, Filter Reg Ex 2"):
    temp = data.filter(data["Group0"].rlike(expr)).collect()
  if debug >= 1:
    print(temp[0]['group0']) # check

def query4(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # Shift (Lag) operation
  w = Window.orderBy("Int1")
  with time_usage(runID,"Row Operation, Shift (Lag)"):
    temp = data.withColumn('status_lag', lag(col('Int1')).over(w)).collect()
  if debug >= 1:
    print(temp[1]['status_lag'])
  
def query5(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # running sum
  # https://stackoverflow.com/questions/46982119/how-to-calculate-cumulative-sum-in-a-pyspark-table
  window = Window.orderBy("Int1")
  with time_usage(runID,"Row Operation, Running Sum"):
    temp = data.withColumn("CumSumTotal", sum(data['Int1']).over(window)).collect()
  if debug >= 1:
    print(temp[1]['CumSumTotal'])
  
def query6(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Row Operation, Writing 100 new rows"):
    temp = data.union(data_100).collect()
  
def query7(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Row Operation, Writing 1000 new rows"):
    data.union(data_1000).collect()  

def query8(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Row Operation, Writing 10000 new rows"):
    data.union(data_10000).collect()
    
def query9(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Asc 1 column"):
    temp = data.orderBy("words0").collect()
  temp[0]['words0']
  
def query10(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Asc 5 column"):
    temp = data.orderBy("words0","words1","words2","words3","words4").collect()
  if debug >= 1:
    print(temp[0]['words1'])
  
def query11(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Asc 10 column"):
      temp = data.orderBy("words0","words1","words2","words3","words4","words5","words6","words7","words8","words9").collect()
  if debug >= 1:
    print(temp[0]['words2'])
  
def query12(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Desc 1 column"):
    temp = data.orderBy(data["words0"].desc()).collect()
  if debug >= 1:
    print(temp[0]['words0'])
  
def query13(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Desc 5 column"):
    temp = data.orderBy(data["words0"].desc(),data["words1"].desc(),data["words2"].desc(),data["words3"].desc(),data["words4"].desc()).collect()
  temp[0]['words1']
  
def query14(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Sorting Desc 10 column"):
    temp = data.orderBy(data["words0"].desc(),data["words1"].desc(),data["words2"].desc(),data["words3"].desc(),data["words4"].desc(),data["words5"].desc(),data["words6"].desc(),data["words7"].desc(),data["words8"].desc(),data["words9"].desc()).collect()
  temp[0]['words2'] 
  
def query15(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # aggregate function will apply function to the whole dataframe (not grouping by a column)
  # takes a dictionary as input
  # however this may not be the best way to do this: https://stackoverflow.com/a/51855775
  with time_usage(runID,"Column Operation, Mathematical Operation on Columns"):
    temp = data.select(['Int1','Float1','Int2','Float3','Float10']).agg({'Int1':'sum','Float1':'avg','Int2':'count','Float3':'min','Float10':'max'}).collect()
  temp[0]

def query16(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
   
  # http://www.learnbymarketing.com/1100/pyspark-joins-by-example/
  if (file_name == "dataset_10MB"):
    with time_usage(runID,"Column Operation, Inner Join 3 Columns"):
      temp = data.join(data_join, ['group0', 'group1', 'group2'], how = 'inner').collect()
    if debug >= 1:
      print(temp[0]['group0'])
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query17(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Inner Join 5 Columns"):
      temp = data.join(data_join, ['group0','group1','group2','group3','group4'], how = 'inner').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")

def query18(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Inner Join 10 Columns"):
      temp = data.join(data_join, ['group0','group1','group2','group3','group4','group5','group6','group7','group8','group9'], how = 'inner').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query19(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
    
  if (file_name == "dataset_10MB"):
    with time_usage(runID,"Column Operation, Left Outer Join 3 Columns"):
      temp = data.join(data_join, ['group0', 'group1', 'group2'], how = 'left_outer').collect()
    if debug >= 1:
      print(temp[0]['group1'])
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query20(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Left Outer Join 5 Columns"):
      temp = data.join(data_join, ['group0','group1','group2','group3','group4'], how = 'left_outer').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query21(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Left Outer Join 10 Columns"):
      temp = data.join(data_join, ['group0','group1','group2','group3','group4','group5','group6','group7','group8','group9'], how = 'left_outer').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query22(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if (file_name == "dataset_10MB"):
    with time_usage(runID,"Column Operation, Full Outer Join 3 Columns"):
      temp = data.join(data_join, ['group0','group1','group2'], how = 'outer').collect()
    if debug >= 1:
      print(temp[0]['group2'])
  else:
    print ("Skipping this query as it gives memory error for larger datasets")

def query23(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Full Outer Join 5 Columns"):
      data.join(data_join, ['group0','group1','group2','group3','group4'], how = 'outer').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")
    
def query24(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  if ((file_name == "dataset_10MB") | (file_name == "dataset_100MB")):
    with time_usage(runID,"Column Operation, Full Outer Join 10 Columns"):
      data.join(data_join, ['group0','group1','group2','group3','group4','group5','group6','group7','group8','group9'], how = 'outer').collect()
  else:
    print ("Skipping this query as it gives memory error for larger datasets")

def query25(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # https://stackoverflow.com/questions/39235704/split-spark-dataframe-string-column-into-multiple-columns
  with time_usage(runID,"Column Operation, Split 1 Column into 5"):
    split_col = split(data["words0"], '\\ |a|e|i|o|u')
    split_col.getItem(1)
    data_new = data.withColumn("words0_0", split_col.getItem(0))
    data_new = data_new.withColumn("words0_1", split_col.getItem(1))
    data_new = data_new.withColumn("words0_2", split_col.getItem(2))
    data_new = data_new.withColumn("words0_3", split_col.getItem(3))
    data_new = data_new.withColumn("words0_4", split_col.getItem(4))
    data_new = data_new.collect()
  if debug >= 1:
    print (data_new[0]['words0_0'])
    print (data_new[0]['words0_1'])
    print (data_new[0]['words0_2'])

def query26(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Split 1 Column into 10"):
    split_col = split(data["words0"], '\\ |a|e|i|o|u')
    data_new = data.withColumn("words0_0", split_col.getItem(0))
    data_new = data_new.withColumn("words0_1", split_col.getItem(1))
    data_new = data_new.withColumn("words0_2", split_col.getItem(2))
    data_new = data_new.withColumn("words0_3", split_col.getItem(3))
    data_new = data_new.withColumn("words0_4", split_col.getItem(4))
    data_new = data_new.withColumn("words0_5", split_col.getItem(5))
    data_new = data_new.withColumn("words0_6", split_col.getItem(6))
    data_new = data_new.withColumn("words0_7", split_col.getItem(7))
    data_new = data_new.withColumn("words0_8", split_col.getItem(8))
    data_new = data_new.withColumn("words0_9", split_col.getItem(9))
    data_new = data_new.collect()
  if debug >= 1:
    print (data_new[0]['words0'])
    print (data_new[0]['words0_1'])
    print (data_new[0]['words0_2'])
    print (data_new[0]['words0_3'])
    print (data_new[0]['words0_7'])
    print (data_new[0]['words0_8'])
    print (data_new[0]['words0_9'])
  
def query27(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # https://www.edureka.co/community/2280/concatenate-columns-in-apache-spark-dataframe
  with time_usage(runID,"Column Operation, Merge 2 columns into 1"):
    temp = data.withColumn("words0m1", concat(col("words0")  , lit(" "), col("words1") )).collect()
  if debug >= 1:
    print(temp[0]['words0m1'])

def query28(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Merge 5 columns into 1"):
    temp = data.withColumn("words0m1", concat(col("words0") , lit(" ")
                                       ,col("words1"), lit(" ")
                                       ,col("words2"), lit(" ")
                                       ,col("words3"), lit(" ")
                                       ,col("words4")
                                      )
                   ).collect()

def query29(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Column Operation, Merge 10 columns into 1"):
    temp = data.withColumn("words0m1", concat(col("words0") , lit(" ")
                                       ,col("words1"), lit(" ")
                                       ,col("words2"), lit(" ")
                                       ,col("words3"), lit(" ")
                                       ,col("words4"), lit(" ")
                                       ,col("words5"), lit(" ")
                                       ,col("words6"), lit(" ")
                                       ,col("words7"), lit(" ")
                                       ,col("words8"), lit(" ")
                                       ,col("words9")
                                      )
                   ).collect()
  
def query30(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # GroupBy (1 group)
  with time_usage(runID,"Aggregate Operation, GroupBy 1 column"):
    temp1 = data.groupBy("Group0").count().collect()
    temp2 = data.groupBy("Group0").sum().collect()
    temp3 = data.groupBy("Group0").avg().collect()
    temp4 = data.groupBy("Group0").min().collect()
    temp5 = data.groupBy("Group0").max().collect()
  if debug >= 1:
    print(temp1[0])
    print(temp2[0])
    print(temp3[0])
    print(temp4[0])
    print(temp5[0])


def query31(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # GroupBy (5 groups)
  with time_usage(runID,"Aggregate Operation, GroupBy 5 columns"):
    data.groupBy("Group0",'Group2','Group4','Group6','Group8').count().collect()
    data.groupBy("Group0",'Group2','Group4','Group6','Group8').sum().collect()
    data.groupBy("Group0",'Group2','Group4','Group6','Group8').avg().collect()
    data.groupBy("Group0",'Group2','Group4','Group6','Group8').min().collect()
    data.groupBy("Group0",'Group2','Group4','Group6','Group8').max().collect()

def query32(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
   #GroupBy (10 groups)
  with time_usage(runID,"Aggregate Operation, GroupBy 10 columns"):
     data.groupBy("Group0",'Group2','Group4','Group6','Group8','Group1','Group3','Group5','Group7','Group9').count().collect()
     data.groupBy("Group0",'Group2','Group4','Group6','Group8','Group1','Group3','Group5','Group7','Group9').sum().collect()
     data.groupBy("Group0",'Group2','Group4','Group6','Group8','Group1','Group3','Group5','Group7','Group9').avg().collect()
     data.groupBy("Group0",'Group2','Group4','Group6','Group8','Group1','Group3','Group5','Group7','Group9').min().collect()
     data.groupBy("Group0",'Group2','Group4','Group6','Group8','Group1','Group3','Group5','Group7','Group9').max().collect()

    
def query33(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # GroupBy with ranking
  # https://stackoverflow.com/a/41662162
  with time_usage(runID,"Aggregate Operation, Ranking by Group"):
    temp = data.withColumn("rank", dense_rank().over(Window.partitionBy("Group0").orderBy(desc("Int1")))).collect()
  if debug >= 1:
    print(temp[0]["rank"])
    print(temp[0]["group0"])
    print(temp[0]["int1"])  
  
def query34(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  # Scala Syntax is identical to PySpark
  # df.groupBy("group0").pivot("group10").sum("float0").count()
  with time_usage(runID,"Mixed Operation, Pivot 1 Rows and 1 Column"):
    temp = data.groupBy("group0").pivot("group10").sum("float0").collect()
  if debug >= 1:
    print(temp[0])
    
def query35(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Mixed Operation, Pivot 5 Rows and 1 Column"):
    temp = data.groupBy("group0","group1","group2","group3","group4").pivot("group10").sum("float1").collect()
  if debug >= 1:
    print(temp[0])
  
def query36(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  with time_usage(runID,"Mixed Operation, Pivot 10 Rows and 1 Column"):
    temp = data.groupBy("group0","group1","group2","group3","group4","group5","group6","group7","group8","group9").pivot("group10").sum("float2").collect()
  if debug >= 1:
    print(temp[0])

def unexpected(runID, debug = 0):
  print(inspect.stack()[0][3]) # Prints function name
  print("Unexpected case")

switcher = {
  1: query1,
  2: query2,
  3: query3,
  4: query4,
  5: query5,
  6: query6,
  7: query7,
  8: query8,
  9: query9,
  10: query10,
  11: query11,
  12: query12,
  13: query13,
  14: query14,
  15: query15,
  16: query16,
  17: query17,
  18: query18,
  19: query19,
  20: query20,
  21: query21,
  22: query22,
  23: query23,
  24: query24,
  25: query25,
  26: query26,
  27: query27,
  28: query28,
  29: query29,
  30: query30,
  31: query31,
  32: query32,
  33: query33,
  34: query34,
  35: query35,
  36: query36,
  "whoa": unexpected
}
 
def run_function(argument, runID, debug = 0):
    func = switcher.get(argument) # gets just the name of the function from switcher
    return func(runID, debug) # returns the call to the function
  
def runQueries(runID = 1, randomize = 0, debug = 0):
  qList = list(range(1,numQueries+1)) # python is non inclusive of last number
  print ("---------------------------------")
  print ("Run ID: {}".format(runID))
  print ("---------------------------------")
  
  if(randomize == 1):
    seed_val = runID * 100
    random.seed(seed_val)
    random.shuffle(qList)
  for queryNum in qList:
    run_function(queryNum, runID, debug)   
    

In [11]:
# specify numBenchmarkRuns + 2 here since we discard 1st run and python is not inclusive of last number
for runID in range(1,numBenchmarkRuns+2):
  runQueries(runID = runID,randomize = randomize, debug = debug)

In [12]:
data_schema = [StructField('Language',StringType(),True) 
              ,StructField('Randomize',IntegerType(),True) 
              ,StructField('Dataset',StringType(),True)
              ,StructField('MachineID',IntegerType(),True) 
              ,StructField('RunID',IntegerType(),True)
              ,StructField('Type',StringType(),True) 
              ,StructField('Operation',StringType(),True)
              ,StructField('TimeTaken',FloatType(),True)]

final_struct = StructType(fields = data_schema)

print(logfilename)
timelog = spark.read.csv(logfilename, schema = final_struct)
logging.getLogger().setLevel(logging.WARNING) # supress all informational items
timelog.show(10000)

In [13]:
# Displays all files in a location
# https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html
display(dbutils.fs.ls("dbfs:/mnt/blob/"))

path,name,size
dbfs:/mnt/blob/datasets/,datasets/,0
dbfs:/mnt/blob/timelogs/,timelogs/,0


In [14]:
display(dbutils.fs.ls("dbfs:/mnt/blob/datasets"))

path,name,size
dbfs:/mnt/blob/datasets/dataset_100MB.csv,dataset_100MB.csv,121258369
dbfs:/mnt/blob/datasets/dataset_100MB_add_100.csv,dataset_100MB_add_100.csv,121049
dbfs:/mnt/blob/datasets/dataset_100MB_add_1000.csv,dataset_100MB_add_1000.csv,1208064
dbfs:/mnt/blob/datasets/dataset_100MB_add_10000.csv,dataset_100MB_add_10000.csv,12036588
dbfs:/mnt/blob/datasets/dataset_10MB.csv,dataset_10MB.csv,12036588
dbfs:/mnt/blob/datasets/dataset_10MB_add_100.csv,dataset_10MB_add_100.csv,121049
dbfs:/mnt/blob/datasets/dataset_10MB_add_1000.csv,dataset_10MB_add_1000.csv,1208064
dbfs:/mnt/blob/datasets/dataset_10MB_add_10000.csv,dataset_10MB_add_10000.csv,12036588
dbfs:/mnt/blob/datasets/dataset_200MB.csv,dataset_200MB.csv,241022871
dbfs:/mnt/blob/datasets/dataset_200MB_add_100.csv,dataset_200MB_add_100.csv,121049


In [15]:
display(dbutils.fs.ls("dbfs:/mnt/blob/timelogs"))

path,name,size
dbfs:/mnt/blob/timelogs/PySpark_dataset_100MB_20190407_0526.csv,PySpark_dataset_100MB_20190407_0526.csv,15839
dbfs:/mnt/blob/timelogs/PySpark_dataset_10MB_20190407_0546.csv,PySpark_dataset_10MB_20190407_0546.csv,17093
dbfs:/mnt/blob/timelogs/PySpark_dataset_200MB_20190406_2220.csv,PySpark_dataset_200MB_20190406_2220.csv,4316
dbfs:/mnt/blob/timelogs/PySpark_dataset_200MB_20190407_0456.csv,PySpark_dataset_200MB_20190407_0456.csv,12853
dbfs:/mnt/blob/timelogs/PySpark_dataset_300MB_20190407_0413.csv,PySpark_dataset_300MB_20190407_0413.csv,12920
dbfs:/mnt/blob/timelogs/PySpark_dataset_500MB_20190407_0246.csv,PySpark_dataset_500MB_20190407_0246.csv,12927
dbfs:/mnt/blob/timelogs/time_Scala_Random_dataset_100MB_20190407_1236.csv,time_Scala_Random_dataset_100MB_20190407_1236.csv,14281
dbfs:/mnt/blob/timelogs/time_Scala_Random_dataset_10MB_20190406_0941.csv,time_Scala_Random_dataset_10MB_20190406_0941.csv,5135
dbfs:/mnt/blob/timelogs/time_Scala_Random_dataset_10MB_20190407_1227.csv,time_Scala_Random_dataset_10MB_20190407_1227.csv,15402
dbfs:/mnt/blob/timelogs/time_Scala_Random_dataset_200MB_20190406_0952.csv,time_Scala_Random_dataset_200MB_20190406_0952.csv,291


In [16]:
# DELETING FILES (USE WITH CARE)
#dbutils.fs.rm("/mnt/blob/timelogs/PySpark_dataset_500MB_20190323_2323.csv")

In [17]:
# Moving a File
# https://forums.databricks.com/questions/14312/how-to-move-files-of-same-extension-in-databricks.html
# dbutils.fs.mv("dbfs:/mnt/blob/add_100.csv", "dbfs:/mnt/blob/datasets/.")

In [18]:
print("Open with")
print("https://dbc-b260fb76-33af.cloud.databricks.com/dbfs" + logfilename + "?o=6744756749927366")