# Vyhľadávanie informácii - projekt - Michal Lüley

In [1]:
# File system management
import os
import sys
import re
import pyspark
from pyspark import jars
import pandas as pd

In [2]:
SOURCE_FILE_PATH = "articles1.xml"

In [3]:
# Setting up spark session
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession \
    .builder \
    .appName("VINF_disease_searching_luley_michal") \
    .master("local[*]") \
    .getOrCreate()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "D:\FIIT\FIIT_skola\4_grade\VINF\source_code\vinfenv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "D:\FIIT\FIIT_skola\4_grade\VINF\source_code\vinfenv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Program Files\Python38\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# Setting up home directory
D_RAW_DIR = os.path.realpath(os.path.join(os.path.dirname("VINF"), '..'))

# Loading essential files

In [None]:
# Specifying the path to files 
main_document_path = os.path.realpath(os.path.join(D_RAW_DIR, 'source_files', 'main_document.csv'))
countries_document_path = os.path.realpath(os.path.join(D_RAW_DIR, 'search_keys', 'countries.csv'))
symptoms_document_path = os.path.realpath(os.path.join(D_RAW_DIR, 'search_keys', 'symptoms.csv'))
transmissions_document_path = os.path.realpath(os.path.join(D_RAW_DIR, 'search_keys', 'transmision.csv'))

# Loading csv files for maintenance
main_df = pd.read_csv(main_document_path, sep='\t')
countries_df = pd.read_csv(countries_document_path)
symptoms_df = pd.read_csv(symptoms_document_path)
transmissions_df = pd.read_csv(transmissions_document_path)

# Filling null values with empty string to able to append items
main_df = main_df.fillna('')

In [None]:
# Defining regex patterns to be used for regex searching
diseases_pattern = re.compile(('|'.join(main_df['Disease'])).replace("(", "\(").replace(")", "\)"), re.IGNORECASE)
diseases_type_pattern = re.compile('bacteria|virus|viral', re.IGNORECASE)
symptoms_pattern = re.compile('|'.join(symptoms_df['Symptom']), re.IGNORECASE)
countries_pattern = re.compile('|'.join(countries_df['Country']), re.IGNORECASE)
transmission_pattern = re.compile('|'.join(transmissions_df['Transmission']), re.IGNORECASE)

In [None]:
# Loading source unzipped xml file
dfs = spark.read \
    .format('com.databricks.spark.xml') \
    .option("rowTag", "page") \
    .load(os.path.join(D_RAW_DIR, 'input_file', SOURCE_FILE_PATH))

In [None]:
# Selecting necessary columns from page, so title and text and filtering importatn pages
from pyspark.sql.functions import col
dfs_valuable = dfs.select(["title", "revision.text._VALUE"]) \
    .filter(~col("revision.text._VALUE").like('#REDIRECT%')) \
    .filter(col("revision.text._VALUE").rlike('(?i)bacteria|(?i)virus|(?i)viral'))

In [None]:
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf

# Defining udf for finding matches between page and item(disease, symptom, transmission, country)
def get_matching_string(line, regex):
    matches = list(set([str(x).lower() for x in re.findall(regex, line)]))
    return matches if matches else None

# Defining udf for finding matches between page and disease type
def get_matching_string_type(line, regex):
    disease_types = re.findall(regex, line)    
    match = max([x.lower().replace("viral", "virus") for x in disease_types], key=disease_types.count) 
    return match if match else None

In [None]:
# Declaring UDF functions
from pyspark.sql.functions import udf
udf_func_type = udf(lambda line :get_matching_string_type(line, diseases_type_pattern), StringType())
udf_func_diseases = udf(lambda line :get_matching_string(line, diseases_pattern), ArrayType(StringType()))
udf_func_symptoms = udf(lambda line :get_matching_string(line, symptoms_pattern), ArrayType(StringType()))
udf_func_countries = udf(lambda line :get_matching_string(line, countries_pattern), ArrayType(StringType()))
udf_func_transmissions = udf(lambda line :get_matching_string(line, transmission_pattern), ArrayType(StringType()))

In [None]:
# Creating collumns disease, type, symptoms, countries, transmissions and dropping yet 
# unnecessary columns _VALUE and title
from pyspark.sql.functions import col, regexp_extract
dfs_final = dfs_valuable \
                .withColumn("disease", udf_func_diseases(col('title'))[0]) \
                .na.drop(subset=["disease"]) \
                .withColumn("type", udf_func_type('_VALUE')) \
                .withColumn("symptoms", udf_func_symptoms(col('_VALUE'))) \
                .withColumn("countries", udf_func_countries(col('_VALUE'))) \
                .withColumn("transmissions", udf_func_transmissions(col('_VALUE'))) \
                .drop(col("_VALUE")) \
                .drop(col("title"))
                

In [None]:
for row in dfs_final.rdd.collect():
    # Getting id of row from main document to know, which line(disease) will be processed
    id = main_df[main_df['Disease'].str.contains(row.disease.replace("(", "\(").replace(")", "\)"), flags=re.IGNORECASE)].index[0]

    # Writing type of disease to main document
    main_df.at[id, 'Type'] = f"{row.type}"

    # Writing symptoms of disease to main document
    if row.symptoms != None:
        for symptom in row.symptoms:
            if main_df.at[id, 'Symptom'] == "":
                main_df.at[id, 'Symptom'] = f"{str(symptom)}"
            else:
                if symptom not in main_df['Symptom'][id]:
                    main_df.at[id, 'Symptom'] = f"{main_df['Symptom'][id]}; {str(symptom)}"

    # Writing countries of disease spread to main document 
    if row.countries != None:
        for country in row.countries:
            if main_df.at[id, 'Country'] == "":
                main_df.at[id, 'Country'] = f"{str(country)}"
            else:
                #print(type(main_df['Country'][id]))
                if country not in main_df['Country'][id]:
                    main_df.at[id, 'Country'] = f"{main_df['Country'][id]}; {str(country)}"

    # Writing transmissions of disease to main document    
    if row.transmissions != None:
        for transmission in row.transmissions:
            if main_df.at[id, 'Transmission'] == "":
                main_df.at[id, 'Transmission'] = f"{str(transmission)}"
            else:
                if transmission not in main_df['Transmission'][id]:
                    main_df.at[id, 'Transmission'] = f"{main_df['Transmission'][id]}; {str(transmission)}"         



    main_df.to_csv(main_document_path, sep='\t', index=False)  