In [524]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as f
import re

import findspark
findspark.init()



In [525]:
re_name = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/type\.object\.name>\t\".*\"@en)'
re_person = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/people\.person\..*>\t)'
re_dec_person = '(\/[gm]\..+\t<http:\/\/rdf\.freebase\.com\/ns\/people\.deceased_person\..*>\t)'

pathFile = '../data/freebase-head-100000000'



In [526]:
sc = SparkSession.builder.master('local[*]').appName('IR Person entity, FREEBASE').config("spark.driver.memory", "15g").getOrCreate()



In [527]:
schema = StructType([StructField('id', StringType(), True),
                    StructField('predicate', StringType(), True),
                    StructField('value', StringType(), True, metadata = {"maxlength":2048})])



In [528]:
freebase = sc.sparkContext.textFile(pathFile)
filtered_data = freebase \
    .filter(lambda x: re.search(re_name, x) or re.search(re_person, x) or re.search(re_dec_person, x)) \
    .distinct() \
    .map(lambda x: re.sub('(http\:\/\/rdf.freebase.com\/ns\/)|(\^\^.*\.)|(\@.*\.)|\<|\>|\"|(\t\.)',"",x)) \
    .map(lambda x: x.split('\t')) 



In [529]:
data_reference = sc.sparkContext.textFile(pathFile)
data_reference = freebase \
    .filter(lambda x: re.search(re_name, x)) \
    .distinct() \
    .map(lambda x: re.sub('(http\:\/\/rdf.freebase.com\/ns\/)|(\^\^.*\.)|(\@.*\.)|\<|\>|\"|(\t\.)',"",x)) \
    .map(lambda x: x.split('\t')) 



In [530]:
#filtered_aliases = sc.createDataFrame(data_aliases_filtered.filter(lambda x: "common.topic.alias" in x[1]), schema)
filtered_aliases = sc.createDataFrame(data_reference.filter(lambda x: "type.object.name" in x[1]), schema)



In [531]:
names = sc.createDataFrame(filtered_data.filter(lambda x: "type.object.name" in x[1]), schema)
births = sc.createDataFrame(filtered_data.filter(lambda x: "people.person.date_of_birth" in x[1]), schema)
deaths = sc.createDataFrame(filtered_data.filter(lambda x: "people.deceased_person.date_of_death" in x[1]), schema)
nationality = sc.createDataFrame(filtered_data.filter(lambda x: "people.person.nationality" in x[1]), schema)
height_meters = sc.createDataFrame(filtered_data.filter(lambda x: "people.person.height_meters" in x[1]), schema)
weight_kg = sc.createDataFrame(filtered_data.filter(lambda x: "people.person.weight" in x[1]), schema)
place_of_birth = sc.createDataFrame(filtered_data.filter(lambda x: "people.person.place_of_birth" in x[1]), schema)
others = sc.createDataFrame(filtered_data.filter(lambda x: "people" in x[1] and "date_of_birth" not in x[1] and "date_of_death" not in x[1]), schema)



In [532]:
births = births.withColumn("note", f.lit(""))
deaths = deaths.withColumn("note", f.lit(""))



In [533]:
names.registerTempTable("names")
births.registerTempTable("births")
deaths.registerTempTable("deaths")
nationality.registerTempTable("nationality")
height_meters.registerTempTable("height_meters")
place_of_birth.registerTempTable("place_of_birth")
weight_kg.registerTempTable("weight_kg")
others.registerTempTable("others")



In [534]:
sql_context = SQLContext(sc.sparkContext)



In [535]:
first_iteration_people = sql_context.sql("""
    select names.id as id, names.value as name,
    case
        when births.value is not null then (cast(births.value as date)) 
        when deaths.value is not null and births.value is null then (cast(deaths.value as date) - 100*365)
        when deaths.value is null and births.value is null then ''
    end as birth,
    case
        when deaths.value is not null then (cast(deaths.value as date))
        when births.value is not null and deaths.value is null then (cast(births.value as date) + 100*365)
        when deaths.value is null and births.value is null then ''
    end as death,
    ifnull(place_of_birth.value, '') as place_of_birth_ref,
    ROUND(ifnull(weight_kg.value, ''), 2) weight_kg,
    ifnull(nationality.value, '') as nationality_ref,
    ROUND(ifnull(height_meters.value, ''), 2) as height_meters,
    ifnull(births.note, 'Birthdate could not be valid.') as birth_validation,
    ifnull(deaths.note, 'Death date could not be valid..') as death_validation
    from names
    left join births on names.id = births.id
    left join deaths on names.id = deaths.id
    left join nationality on names.id = nationality.id
    left join height_meters on names.id = height_meters.id
    left join place_of_birth on names.id = place_of_birth.id
    left join weight_kg on names.id = weight_kg.id
    left join others on names.id = others.id
    where births.value is not null or deaths.value is not null or nationality.value is not null or height_meters.value is not null 
    or place_of_birth.value is not null or weight_kg.value is not null or others.value is not null
    """)

first_iteration_people = first_iteration_people.drop_duplicates().distinct()



In [536]:
first_iteration_people = first_iteration_people.withColumnRenamed("id", "id_person")



In [522]:
#first_iteration_people.show()

In [523]:
first_iteration_people.repartition(1).write.mode("overwrite").format('com.databricks.spark.csv') \
    .option("mapreduce.fileutputcommitter.marksuccessfuljobs", "false").save('outputs/first_iteration', header = 'true')

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 



In [537]:
second_interation_nationality = first_iteration_people.join(filtered_aliases, first_iteration_people["nationality_ref"] == filtered_aliases["id"], "left")

In [538]:
second_interation_nationality = second_interation_nationality.withColumnRenamed("value", "nationality")
second_interation_nationality = second_interation_nationality.drop("id", "predicate", "nationality_ref")



In [None]:
#second_interation_nationality.show()



+----------+--------------------+----------+----------+------------------+---------+-------------+--------------------+--------------------+-----------+
| id_person|                name|     birth|     death|place_of_birth_ref|weight_kg|height_meters|    birth_validation|    death_validation|nationality|
+----------+--------------------+----------+----------+------------------+---------+-------------+--------------------+--------------------+-----------+
| m.03h4grl|       Justin Walker|1975-09-06|2075-08-12|           m.09tlh|     null|         null|                    |Death date could ...|       null|
| m.03mdr98|       Tommaso Reato|1984-05-12|2084-04-17|          m.03qhpd|     null|         1.96|                    |Death date could ...|       null|
|m.011455mt|    Francesco Stanco|1987-02-26|2087-02-01|          m.0dvsjp|     80.0|         1.91|                    |Death date could ...|       null|
| m.03h0m3j|   Åse Lill Kimestad|1955-08-16|2055-07-22|                  |     nul

In [None]:
second_interation_nationality.repartition(1).write.mode("overwrite").format('com.databricks.spark.csv') \
    .option("mapreduce.fileutputcommitter.marksuccessfuljobs","false").save('outputs/second_iteration', header = 'true')



In [539]:
third_iteration_place_of_birth = second_interation_nationality.join(filtered_aliases, second_interation_nationality["place_of_birth_ref"] == filtered_aliases["id"], "left")

In [540]:
third_iteration_place_of_birth = third_iteration_place_of_birth.withColumnRenamed("value", "place_of_birth")
third_iteration_place_of_birth = third_iteration_place_of_birth.drop("id", "predicate", "place_of_birth_ref")



In [None]:
#third_iteration_place_of_birth.show()



+----------+--------------------+----------+----------+---------+-------------+--------------------+--------------------+-----------+--------------+
| id_person|                name|     birth|     death|weight_kg|height_meters|    birth_validation|    death_validation|nationality|place_of_birth|
+----------+--------------------+----------+----------+---------+-------------+--------------------+--------------------+-----------+--------------+
|m.010q3gmb|          Jiexi Wang|          |          |     null|         null|Birthdate could n...|Death date could ...|       null|          null|
|m.01117gkg|           R.G. Gopi|          |          |     null|         null|Birthdate could n...|Death date could ...|       null|          null|
|m.01148g28|       Edward Annan |          |          |     null|         null|Birthdate could n...|Death date could ...|       null|          null|
|m.011ctvf7|       Mikhail Yudin|          |          |     null|         null|Birthdate could n...|Death 

In [541]:
third_iteration_place_of_birth.repartition(1).write.mode("overwrite").format('com.databricks.spark.csv') \
    .option("mapreduce.fileutputcommitter.marksuccessfuljobs","false").save('outputs/third_iteration', header = 'true')

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 



22/11/18 13:34:02 WARN TransportChannelHandler: Exception in connection from /10.10.60.222:61181
java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimi

In [None]:
third_iteration_place_of_birth.filter("place_of_birth IS NOT NULL").show()



+----------+---------------+----------+----------+---------+-------------+----------------+--------------------+-----------+--------------+
| id_person|           name|     birth|     death|weight_kg|height_meters|birth_validation|    death_validation|nationality|place_of_birth|
+----------+---------------+----------+----------+---------+-------------+----------------+--------------------+-----------+--------------+
|  m.03hhpw|Jacques Gaillot|1935-09-11|2035-08-17|     null|         null|                |Death date could ...|       null|  Saint-Dizier|
|m.012hf_jw|   Kedar Gurung|1948-07-09|2048-06-14|     null|         null|                |Death date could ...|       null|        Sikkim|
| m.05b385q|   Kelvin Clark|1956-01-30|2056-01-05|     null|         null|                |Death date could ...|       null|        Odessa|
+----------+---------------+----------+----------+---------+-------------+----------------+--------------------+-----------+--------------+



In [None]:
#third_iteration_place_of_birth.filter("nationality IS NOT NULL").show()



+---------+----------+----------+----------+---------+-------------+----------------+----------------+---------------+--------------+
|id_person|      name|     birth|     death|weight_kg|height_meters|birth_validation|death_validation|    nationality|place_of_birth|
+---------+----------+----------+----------+---------+-------------+----------------+----------------+---------------+--------------+
|m.0268v1r|Jirō Tamon|1878-09-28|1934-11-24|     null|         null|                |                |Empire of Japan|          null|
+---------+----------+----------+----------+---------+-------------+----------------+----------------+---------------+--------------+

