In [0]:
sc.version

In [0]:
#install required python libraries and load them
#%run ./install_python_package
%pip install bs4
%pip install requests
%pip install wget
%pip install pyspark

In [0]:
import numpy as np
import pandas as pd
import requests
import zipfile
from bs4 import BeautifulSoup
import os
import requests
from os.path import basename
import gzip
import shutil
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.types as st
from pyspark.sql.types import StructType,StructField, StringType

In [0]:
### Collecting all compressed file download links
def get_download_links(rooturl, key):
    zip_file_urls = []
    url_address = rooturl
    source_code = requests.get(url_address)
    plain_text = source_code.text
    soup = BeautifulSoup(plain_text, "html.parser")
    extension = 'gz'

    for link in soup.findAll('a'):
        sub_url = link.get('href')
        if sub_url.endswith(extension):
            sub_url = url_address + sub_url +    key
            zip_file_urls.append(sub_url)
    
    zip_file_df = pd.DataFrame({"url_link": zip_file_urls})
    spark_df = sqlContext.createDataFrame(zip_file_df)
    return(spark_df)
  
    

In [0]:
##[FileInfo(path='dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/', name='Bitcoin-cash/', size=0)]
dbutils.fs.ls("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/url_list.csv")


In [0]:
##%fs rm -r "/mnt/BlockChain/Blocks/Bitcoin-cash/url_list.csv"

In [0]:
## Get compressed file links for Bitcoin-Cash Block data       
def get_file_url_links_spark(url_file_path, root_url, key):          
  url_schema_load = st.StructType([st.StructField('url_link', st.StringType(), True)])
  try:
    spark_url_df = spark.read.csv(url_file_path + "url_list.csv", schema= url_schema_load)
    print(spark_url_df.count())
    if spark_url_df.count() > 0:
      return (spark_url_df)
  except:
    print("Url file not found")
    
  spark_url_df = get_download_links(root_url, key)
  spark_url_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(url_file_path + "url_list.csv")
  return spark_url_df


In [0]:
def get_bitcoin_cash_blocks_file_url_links():
  url_file_path = "/mnt/BlockChain/Blocks/Bitcoin-cash/"
  key = '?key=202001ZjMvj8R3BF'
  bc_cash_blocks_root = 'https://gz.blockchair.com/bitcoin-cash/blocks/'
  spark_url_df = get_file_url_links_spark (url_file_path, bc_cash_blocks_root, key)  
  return spark_url_df

In [0]:
dl_success = sc.accumulator(0)
dl_fail = sc.accumulator(0)
dl_skip = sc.accumulator(0)

In [0]:
def downloader_new(row):
    zip_file_name = ""
    log_file = open("/databricks/driver/ZipDownloads/download_failures.txt", "a")
    download_success= True
    try:
        # Download and write to file.
        zip_url = row["url_link"]
        resp = requests.get(zip_url)
        key_idx = basename(resp.url).find("?key")
        zip_file_name = basename(resp.url)[:key_idx]        
        #print(f"the filename is {zip_file_name}")
        zip_file = open("ZipDownloads/"+zip_file_name, 'wb')
        zip_file.write(resp.content)  
        zip_file.close()      
        extract_TSV_File(zip_file_name)
        dl_success.add(1)  
        download_failed = False
        
    except Exception as ex:
      print (ex)
      download_success = False
      dl_fail.add(1)
    finally:
      if download_success:
         log_file.write(zip_file_name+","+"0" "\n")
      else:
        log_file.write(zip_file_name+","+"1" "\n")
        
      log_file.close()
                

In [0]:
def extract_TSV_File(ZipFileName):  
  block_size=65536
  log_file = open("/databricks/driver/Extracts/ZipDownloads/Extracts/Extract_failures.txt", "a")
  extract_success= True
  ext = os.path.splitext(ZipFileName)
  dest_file_name = ext[0]
  ext= ext[1]
  if (ext == ".gz"): 
    try:
      dest_file_name_full = "ZipDownloads/Extracts/"+ dest_file_name
      with (gzip.open("ZipDownloads/"+ZipFileName, 'rb')) as s_file, \
        open(dest_file_name_full, 'wb') as d_file:
        shutil.copyfileobj(s_file, d_file, block_size)
    except Exception as ex:
      print("Error in extracting")
      extract_success = False
      print(ex)
    finally:
      if extract_success:
         log_file.write(dest_file_name+","+"0" "\n")
      else:
        log_file.write(dest_file_name+","+"1" "\n")
        
      log_file.close()
      

In [0]:
###Extract tsv files from gz compressed files
def extract_load_into_dataframe(file_directory, bitcastdf):  
  block_size=65536
  for file_name in os.listdir(file_directory): #"/databricks/driver/"):
    ext = os.path.splitext(file_name)
    dest_file_name = ext[0]
    ext= ext[1]
  if (ext == ".gz"): 
    try:
      with (gzip.open(file_name, 'rb')) as s_file, \
        open(dest_file_name, 'wb') as d_file:
        shutil.copyfileobj(s_file, d_file, block_size)
        os.remove("/databricks/driver/" + file_name) 
        tsvdf = pd.read_csv(dest_file_name, sep='\t',  lineterminator='\n', names=None)
        bitcastdf = bitcastdf.append(tsvdf)
    except:
      print("Error in extracting")

In [0]:
def create_bc_block_data_folders():
  ### Down load bitcoin cash block level data
  dbutils.fs.mkdirs("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/")
  dbutils.fs.mkdirs("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData")
  dbutils.fs.mkdirs("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/ZipFIles")
  destlocation_tsv = "dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/"
  return destlocation_tsv

In [0]:
def create_download_folders():
  dbutils.fs.mkdirs("file:/databricks/driver/ZipDownloads")
  dbutils.fs.mkdirs("file:/databricks/driver/ZipDownloads/Extracts")

In [0]:
def cleanup_driver_location():
  dbutils.fs.rm("file:/databricks/driver/ZipDownloads/",  True)
  dbutils.fs.rm("file:/databricks/driver/ZipDownloads/Extracts/",  True)

In [0]:
### Create sql table
#spark_url_df.createOrReplaceTempView("BC_Block_Url")
q = """select *
from BC_Block_Url limit 10
"""
sub_urldf = spark.sql(q)
sub_urldf.show(2)

In [0]:
### setup download locations in driver
cleanup_driver_location()
create_download_folders()

In [0]:
dbutils.fs.ls("file:/databricks/driver/ZipDownloads/")

In [0]:
### Create structure at mnt for 
create_bc_block_data_folders()

In [0]:
### Get url links to downloade compressed files
spark_url_df = get_bitcoin_cash_blocks_file_url_links()

In [0]:
### Execute download command 
def download_files_in_url_df(url_df):
  %time url_df.foreach(lambda r: downloader_new(r))
  [dl_success.value, dl_skip.value, dl_fail.value]

In [0]:
### Move downloaed compressed and tsv files from driver to DBFS folders
def movedownloads_bc_block_to_dbfs():
  dbutils.fs.mv("file:/databricks/driver/ZipDownloads/Extracts", "dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/", recurse=True)
  dbutils.fs.mv("file:/databricks/driver/ZipDownloads/", "dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/ZipFIles", recurse = True)

In [0]:
len(dbutils.fs.ls("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/ZipFIles/"))
len(dbutils.fs.ls("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData"))

In [0]:
## Code to debug
df_rows = spark_url_df.limit(10).collect()
downloader_new(df_rows[2])
#fname = df_rows[1]['url_link']
#print(fname)
#[dl_success.value, dl_skip.value, dl_fail.value]
#print(postcode_of_lat_long(df_rows[0]['Lat'],df_rows[0]['Long_']))

In [0]:
### Load Extracted TSV Into a DataFrame
def Load_TSV_into_dataframe(dbfs_file_path):
  distFile = sc.textFile(dbfs_file_path)
  return (distFile)
    
    

In [0]:
tsv_files_path = "dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/blockchair_bitcoin-cash_blocks_*.tsv"


In [0]:
dbutils.fs.ls("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/ZipFIles/download_failures.txt")
extract_df = spark.read.csv("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/ZipFIles/download_failures.txt")
### Create sql table
extract_df.createOrReplaceTempView("extract_log_table")


In [0]:
row_data = dbutils.fs.ls("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/")
tsv_file_path = [x[0] for x in row_data]
tsv_file_df = spark.createDataFrame(tsv_file_path, StringType())

In [0]:
tsv_file_df = spark.createDataFrame(tsv_file_path, StringType())

In [0]:
bc_block_tsv_rdd = sc.textFile("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/*.tsv")
bc_block_tsv_rdd_Df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', delimiter='\t', inferschema='true').load("/mnt/BlockChain/Blocks/Bitcoin-cash/TSVData/*.tsv")


In [0]:
bc_block_tsv_rdd_Df.count()

In [0]:
bc_block_tsv_rdd_Df.columns

In [0]:
## create a csv fiel with subset of columns
bc_block_tsv_rdd_Df[["ID", "time","size","median_time","guessed_miner","difficulty","fee_total", "fee_total_usd", "reward", "reward_usd"]].coalesce(1).write.option("header", "true").csv("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/bc_block_sub_df_exported.csv")

In [0]:
#dbutils.fs.rm("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/bc_block_sub_df_exported.csv", recurse = True)

In [0]:
#databricks fs cp  
## copy to filestore to download the file using web browser
dbutils.fs.cp("dbfs:/mnt/BlockChain/Blocks/Bitcoin-cash/bc_block_sub_df_exported.csv", "dbfs:/FileStore/bc_block_sub_df_exported.csv", recurse = True)

In [0]:
bc_block_tsv_rdd_Df[["guessed_miner"]].collect().unique()["guessed_miner"]