## Define core variables for data structure, asking for user input

In [8]:
import time 
import string
import random
import pyspark.sql.functions as F

User_ID_Columns = ['user_id']
User_ID_Table = 'user_id'

Personal_Details_Columns = ['user_id','name','email','date_of_birth','country',
                            'address_line_1','address_line_2','city','post_code']
Personal_Details_Table = 'personal_details'

Bank_Details_Columns = ['user_id','acc_number','sort_code','name','currency']
Bank_Details_Table = 'bank_details'

Table_Details = {User_ID_Table : User_ID_Columns,
                 Personal_Details_Table : Personal_Details_Columns,
                 Bank_Details_Table : Bank_Details_Columns}

New_User_Questions = ['Enter full name: ',
                      'Enter email address: ',
                      'Enter date of birth (yyyy-mm-dd): ', 
                      'Enter country: ',
                      'Enter address line 1: ',
                      'Enter address line 2: ',
                      'Enter city of residence: ',
                      'Enter post/zip code: '
                      ]

New_Bank_Questions = ['Enter account number: ',
                      'Enter sort code: ',
                      'Enter name on account: ', 
                      'Enter currency type: '
                      ]
  

In [111]:
def create_sample_data():
  
  ids = [['MKPXIA3809'],['ZLWJKS8140'],['SHIZCF0984'],['EMJGXM8166']]
  id_sample_data = spark.createDataFrame(ids, User_ID_Columns)
  id_sample_data.write.format('parquet').mode('overwrite').saveAsTable(User_ID_Table)
  
  personals = [('MKPXIA3809', 'Alice Jones', 'aj@gmail.com','1990-01-01', 'United Kingdom', '12 Mill Lane', '', 'Newport', 'NP111PN'), 
               ('ZLWJKS8140', 'Robert Smith', 'rs@gmail.com', '1991-02-02', 'United Kingdom', '13 Mill Lane', '', 'Newport', 'NP111PN'),
               ('SHIZCF0984', 'Charlotte Cooper', 'cc@gmail.com', '1992-03-03', 'United Kingdom', '14 Mill Lane', '', 'Newport', 'NP111PN'),
               ('EMJGXM8166', 'Daniel Price', 'dp@gmail.com', '1993-04-04', 'United Kingdom', '15 Mill Lane', '', 'Newport', 'NP111PN')]
  personal_sample_data = spark.createDataFrame(personals, Personal_Details_Columns)
  personal_sample_data.write.format('parquet').mode('overwrite').saveAsTable(Personal_Details_Table)

  banks = [('MKPXIA3809', 12345678, 203212, 'Alice Jones', 'GBP'),
           ('ZLWJKS8140', 24512315, 422321, 'Bob Smith', 'GBP'),
           ('SHIZCF0984', 76326171, 848212, 'Charlotte Cooper', 'GBP'),
           ('EMJGXM8166', 19874611, 452152, 'Daniel Price', 'GBP')]
  bank_sample_data = spark.createDataFrame(banks, Bank_Details_Columns)
  bank_sample_data.write.format('parquet').mode('overwrite').saveAsTable(Bank_Details_Table)                



## Create sample data to populate database


In [127]:
create_sample_data()

## Functions to create new ID value, check for uniqueness in table of previous ID values, and repeat until unique ID created

In [53]:
def generate_new_id_value():
  
  seed = int(10000 * (100 * time.clock() - int(100 * time.clock())))
  random.seed(seed)
  
  id_numeric = random.randint(0, 9999)
  #convert to str and pad to 4 digits
  id_num_as_str = str(id_numeric)
  if (len(id_num_as_str) < 4):
    id_num_as_str = '0' * (4 - len(id_num_as_str)) + id_num_as_str
    
  id_alpha = ''.join([random.choice(string.ascii_uppercase) for _ in range(6)])
  
  user_id = id_alpha + str(id_numeric)
  
  print('Creating unique ID')
  print(user_id)
  
  user_data = spark.sql('SELECT * FROM user_id') 
  repeat_user_id = user_data.where(user_data[User_ID_Columns[0]] == user_id).count()
  
  if(repeat_user_id):
    user_id = generate_new_id_value()
  
  
  insert_user_id(user_id)
  
  return user_id  


def insert_user_id(user_id):
  
  df = spark.createDataFrame([[user_id]], User_ID_Columns)
  
  df.write.format('parquet').mode('append').saveAsTable(User_ID_Table)
  


In [56]:
print(generate_new_id_value())

Creating unique ID
IIFLZE1300
IIFLZE1300


In [57]:
spark.sql('SELECT * FROM user_id').show()

+----------+
|   user_id|
+----------+
|MKPXIA3809|
|ZLWJKS8140|
|SHIZCF0984|
|EMJGXM8166|
|IIFLZE1300|
|KSDOXZ7159|
+----------+



In [94]:
def add_new_inputted_data(user_id, questions):
  
  results = [user_id]
  
  for q in questions:
    results.append(raw_input(q))
    
  return results

def add_new_user(user_id):
  
  return add_new_inputted_data(user_id, New_User_Questions)

def add_bank_details(user_id):
  
  return add_new_inputted_data(user_id, New_Bank_Questions)
    
def insert_record(user_id, record, table):
    
  col_names = Table_Details[table]
  df = spark.createDataFrame([record], col_names)
  
  df.write.format('parquet').mode('append').saveAsTable(table)
  
  return df

In [128]:
def edit_details(user_id, table, questions):
  
  results = ['']
  for q in questions:
    replace_answer = raw_input('Edit {0} (y/n) '.format(q[6:]))
    if(replace_answer.lower()[0] == 'y'):
      results.append(raw_input(q))
    else:
      results.append('')
  
  
  entire_data, editing_record = get_original_data(user_id, table)
  
  for idx, column in enumerate(editing_record.columns):
    if(results[idx] != ''):
      editing_record = editing_record.withColumn(column, F.lit(results[idx]))
      
  edit_record(user_id, editing_record, entire_data, table)

def get_original_data(user_id, table):
     
  sql_string = 'SELECT * FROM ' + table
  original_df = spark.sql(sql_string)
  
  old_record = original_df.where(original_df['user_id'] == user_id)
  
  return original_df, old_record
  
def edit_record(user_id, new_record, data, table):
  
  new_data = data.where(data['user_id'] != user_id).union(new_record)

  new_data = cutLineage(new_data)

  new_data.write.format('parquet').mode('overwrite').saveAsTable(table)
    
def edit_user_details(user_id):
    
  edit_details(user_id, Personal_Details_Table, New_User_Questions)
  
def edit_bank_details(user_id):
    
  edit_details(user_id, Bank_Details_Table, New_Bank_Questions)
  


In [98]:
def add_user():
  
  user_id = generate_new_id_value()
  
  new_user = insert_record(user_id, add_new_user(user_id), Personal_Details_Table)
  
  bank_details = insert_record(user_id, add_bank_details(user_id), Bank_Details_Table)



In [99]:
add_user()

Creating unique ID
MLFXVW1753
Enter full name: a
Enter email address: b
Enter date of birth (yyyy-mm-dd): c
Enter country: d
Enter address line 1: e
Enter address line 2: f
Enter city of residence: g
Enter post/zip code: h
Enter account number: 11231314
Enter sort code: 203941
Enter name on account: a
Enter currency type: gbp


In [130]:
edit_bank_details('SHIZCF0984')


Edit account number:  (y/n) n
Edit sort code:  (y/n) n
Edit name on account:  (y/n) y
Enter name on account: bob bobobob
Edit currency type:  (y/n) n


Py4JJavaError: An error occurred while calling o629.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:532)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:176)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:433)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
	at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 72.0 failed 4 times, most recent failure: Lost task 0.3 in stage 72.0 (TID 118, cluster-4545-w-1.europe-west1-b.c.p08822-2019-cw-michael-hoskin.internal, executor 1): java.io.FileNotFoundException: File does not exist: hdfs://cluster-4545-m/user/hive/warehouse/bank_details/part-00001-6888a198-1fd1-4703-b23d-4d828ce17219-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:135)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
	... 32 more
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://cluster-4545-m/user/hive/warehouse/bank_details/part-00001-6888a198-1fd1-4703-b23d-4d828ce17219-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:135)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [129]:
spark.sql('select * from {0}'.format(Bank_Details_Table)).show()

+----------+----------+---------+----------------+--------+
|   user_id|acc_number|sort_code|            name|currency|
+----------+----------+---------+----------------+--------+
|SHIZCF0984|  76326171|   848212|Charlotte Cooper|     GBP|
|EMJGXM8166|  19874611|   452152|    Daniel Price|     GBP|
|MKPXIA3809|  12345678|   203212|     Alice Jones|     GBP|
|ZLWJKS8140|  24512315|   422321|       Bob Smith|     GBP|
+----------+----------+---------+----------------+--------+



In [124]:
from pyspark.sql import DataFrame

def cutLineage(df):
    jRDD = df._jdf.toJavaRDD()
    jSchema = df._jdf.schema()
    jRDD.cache()
    sqlCtx = df.sql_ctx
    try:
        javaSqlCtx = sqlCtx._jsqlContext
    except:
        javaSqlCtx = sqlCtx._ssql_ctx
    newJavaDF = javaSqlCtx.createDataFrame(jRDD,jSchema)
    newDF = DataFrame(newJavaDF, sqlCtx)
    
    return newDF
    
    
    

In [134]:
d = spark.createDataFrame([['a',1]], ['colname'])
d = d.union(d)
d.explain()
d.show()

== Physical Plan ==
Union
:- Scan ExistingRDD[colname#1730,_2#1731L]
+- Scan ExistingRDD[colname#1730,_2#1731L]
+-------+---+
|colname| _2|
+-------+---+
|      a|  1|
|      a|  1|
+-------+---+



In [135]:
d2 = cutLineage(d)

In [136]:
d2.explain()
d2.show()

== Physical Plan ==
Scan ExistingRDD[colname#1749,_2#1750L]
+-------+---+
|colname| _2|
+-------+---+
|      a|  1|
|      a|  1|
+-------+---+

