In [3]:
import findspark
findspark.init()
import pandas as pd;
import csv;
import numpy as np;
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
import re
from pyspark.sql.types import StringType, ArrayType,StructType,StructField

In [5]:
conf = SparkConf().setAppName('Read & parse text data file in pyspark')
sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext.getOrCreate(sc)

In [6]:
PAPER_KEYS_MAP = {
    "#index": "paper_id",
    "#*": "title",
    "#@": "authors",
    "#o": "affiliations",
    "#t": "year",
    "#c": "publication_venue",
    "#%": "ref_ids",
    "#!": "abstract",
}
PAPER_DATASET_PATH = './assets/AMiner-Paper.txt'

AUTHOR_KEYS_MAP = {
    "#index": "author_id",
    "#n": "name",
    "#pc": "paper_count",
    "#cn": "citation_count",
    "#t": "research_interests",
    "#hi": "h_index", 
#     "#upi": "u_p_index", 
#     "#pi": "p_index"
#     "#a": "affiliations",
}
AUTHOR_DATASET_PATH = './assets/AMiner-Author.txt'

In [7]:
def zipDatasetWithIndex(datasetPath):
    lines = sc.textFile(datasetPath).zipWithIndex()
    return lines

In [8]:
def exchangeElementAtIndexToOne(index, zeroArray):
    zeroArray[index] = 1;

def doCumSumForIndexRows(lines):
    # Find the starting row of each data entry
    pos = lines.filter(lambda x: "#index" in x[0]).map(lambda x: x[1]).collect() 
    zeroArray = np.zeros(lines.count(), dtype=int)
    for element in pos:
        exchangeElementAtIndexToOne(element, zeroArray)
    # Calculate cumulative sum of starting data entry indexes
    summedArray = np.cumsum(zeroArray)
    return summedArray;

In [9]:
def createBroarcastedTuple(x, arrayOfStartRowIndexes):
    return (arrayOfStartRowIndexes.value[x[1]], x[0])

def convertDataIntoIndexedTuples(datasetLines, cumSummedIndexRowsArray):
    broadcastedArray = sc.broadcast(cumSummedIndexRowsArray);
    convertedData = datasetLines.map(lambda dataLine: createBroarcastedTuple(dataLine, broadcastedArray))
    return convertedData

In [10]:
list_get = lambda l, x, d=None: d if not l[x] else l[x] # safe getter of list values

def splitIntoKeyValue(stringToSplit, keyMap):
    formattedValue = stringToSplit 
    if type(stringToSplit) is str:
        if len(formattedValue) > 0:
            formattedValue = formattedValue.split(" ", 1)
            key = list_get(formattedValue, 0)
            mappedKey = keyMap.get(key)
            if (mappedKey):
                formattedValue = { mappedKey: list_get(formattedValue, 1, '') }
            else:
                formattedValue = {}
        else:
            formattedValue = {}
    return formattedValue

def appendStringOrListIntoList(lst, elToAppend):
    if elToAppend is not None:
        if type(elToAppend) == str:
            lst.append(elToAppend)
        else:
            lst = lst + elToAppend
    return lst

In [11]:
# Paper related
def convertPaperFeatures(dct, affiliations_data={}, papers_data={}, paper_authors_data={}, publication_venues_data={}, refs_data={}):
    if dct.get("paper_id"):
        paper_id = dct.get("paper_id")   
        affiliations_data["paper_id"] = paper_id
        papers_data["paper_id"] = paper_id
        paper_authors_data["paper_id"] = paper_id
        publication_venues_data["paper_id"] = paper_id
        refs_data["paper_id"] = paper_id
    elif dct.get("affiliations"):
        affiliations_data["affiliations"] = dct.get("affiliations")
    elif dct.get("ref_ids"):
        appendStringOrListIntoList(refs_data["ref_ids"], dct.get("ref_ids"))
    elif dct.get("authors"):
        paper_authors_data["authors"] = dct.get("authors")
    elif dct.get("title"):
        papers_data["title"] = dct.get("title")
    elif dct.get("year"):
        papers_data["year"] = dct.get("year")
    elif dct.get("publication_venue"):
        publication_venues_data["publication_venue"] = dct.get("publication_venue")
        
# Paper related
def reducePaperFeaturesToDict(featureA='', featureB='', keyMap={}):
    papers_data = {}
    affiliations_data = {}
    paper_authors_data = {}
    publication_venues_data = {}
    refs_data = { "ref_ids": [] }
    try:        
        splittedA = splitIntoKeyValue(featureA, keyMap);
        splittedB = splitIntoKeyValue(featureB, keyMap);
        
        if (splittedA.get('papers_data')):
            papers_data = splittedA.get('papers_data')
            affiliations_data = splittedA.get('affiliations_data')
            paper_authors_data = splittedA.get('paper_authors_data')
            publication_venues_data = splittedA.get('publication_venues_data')
            refs_data = splittedA.get('refs_data')
        else:
            convertPaperFeatures(splittedA, affiliations_data, papers_data, paper_authors_data, publication_venues_data, refs_data)
        convertPaperFeatures(splittedB, affiliations_data, papers_data, paper_authors_data, publication_venues_data, refs_data)
        return {
            "papers_data": papers_data,
            "affiliations_data": affiliations_data,
            "paper_authors_data": paper_authors_data,
            "publication_venues_data": publication_venues_data,
            "refs_data": refs_data
        }
    except Exception as error:
        print("ERROR: ", featureA, featureB, error)
        raise

In [12]:
# Author related
def convertAuthorFeatures(dct, authors_data={}, research_interests_data={}):
    if dct.get("author_id"):
        author_id = dct.get("author_id")   
        authors_data["author_id"] = author_id
        research_interests_data["author_id"] = author_id
    elif dct.get("name"):
        authors_data["name"] = dct.get("name")
    elif dct.get("paper_count"):
        authors_data["paper_count"] = dct.get("paper_count")
    elif dct.get("сitation_count"):
        authors_data["сitation_count"] = dct.get("сitation_count")
    elif dct.get("research_interests"):
        research_interests_data["research_interests"] = dct.get("research_interests")
    elif dct.get("h_index"):
        authors_data["h_index"] = dct.get("h_index")
    elif dct.get("publication_venue"):
        authors_data["publication_venue"] = dct.get("publication_venue")
        
# Author related
def reduceAuthorFeaturesToDict(featureA='', featureB='', keyMap={}):
    authors_data = {}
    research_interests_data = {}
    try:        
        splittedA = splitIntoKeyValue(featureA, keyMap);
        splittedB = splitIntoKeyValue(featureB, keyMap);
        
        if (splittedA.get('authors_data')):
            authors_data = splittedA.get('authors_data')
            research_interests_data = splittedA.get('research_interests_data')
        else: # If it's first reduce run, accumulator === the first item of the list. So the item should be converted
            convertAuthorFeatures(splittedA, authors_data, research_interests_data)
        convertAuthorFeatures(splittedB, authors_data, research_interests_data)
        return {
            "authors_data": authors_data,
            "research_interests_data": research_interests_data,
        }
    except Exception as error:
        print("ERROR: ", featureA, featureB, error)
        raise

In [13]:
def convertDataIntoDicts(objects, dataKeyMap, reducer):
    reducedObjects = objects.reduceByKey(lambda a, b: reducer(a, b, dataKeyMap))
    print(reducedObjects.sortByKey(ascending=False).first())
    mappedDicts = reducedObjects.map(lambda x: x[1]) # retrieve dicts from the tuples
    return mappedDicts

def convertDictsArrayIntoCSVFile(dictsArray, fileName):
    df = sqlContext.createDataFrame(dictsArray)
    print(df.show())
    # Save data to csv file
    df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(fileName)

In [14]:
paperTextLines = zipDatasetWithIndex(PAPER_DATASET_PATH)

                                                                                

In [15]:
summedArrayOfPaperIndexRows = doCumSumForIndexRows(paperTextLines)

                                                                                

In [16]:
paperItemTuples = convertDataIntoIndexedTuples(paperTextLines, summedArrayOfPaperIndexRows)

In [17]:
paperDictsArray = convertDataIntoDicts(paperItemTuples, PAPER_KEYS_MAP, reducePaperFeaturesToDict)



(2092356, {'papers_data': {'paper_id': '2092356', 'title': 'Reliability prediction through system modeling', 'year': '2013'}, 'affiliations_data': {'paper_id': '2092356', 'affiliations': 'Dept of Computer Engg IIT(BHU) Varanasi, India;Reactor Safety Division Bhabha Atomic Research Centre Dept of Atomic Energy, Govt of India;Dept of Computer Engg IIT(BHU) Varanasi, India'}, 'paper_authors_data': {'paper_id': '2092356', 'authors': 'Lalit Kumar Singh;Gopika Vinod;A. K. Tripathi'}, 'publication_venues_data': {'paper_id': '2092356', 'publication_venue': 'ACM SIGSOFT Software Engineering Notes'}, 'refs_data': {'ref_ids': ['215579', '333683', '511383', '594375', '641666', '763878', '966860', '1056157'], 'paper_id': '2092356'}})


                                                                                

In [19]:
def mapPapersData(data):
    refs_data = data["refs_data"]
    refs_data['ref_ids'] = ';'.join(refs_data['ref_ids'])
    return refs_data

papers_d = paperDictsArray.map(lambda x: x["papers_data"])
affiliations_d = paperDictsArray.map(lambda x: x["affiliations_data"])
paper_authors_d = paperDictsArray.map(lambda x: x["paper_authors_data"])
publication_venues_d = paperDictsArray.map(lambda x: x["publication_venues_data"])
paper_refs_d = paperDictsArray.map(lambda x:mapPapersData(x))

In [31]:
print(papers_d.count())



2092356


                                                                                

In [20]:
# convertDictsArrayIntoCSVFile(papers_d, "./assets/papers_d9.csv")
# convertDictsArrayIntoCSVFile(affiliations_d, "./assets/affiliations_d9.csv")
# convertDictsArrayIntoCSVFile(paper_authors_d, "./assets/paper_authors_d9.csv")
# convertDictsArrayIntoCSVFile(publication_venues_d, "./assets/publication_venues_d9.csv")
convertDictsArrayIntoCSVFile(paper_refs_d, "./assets/paper_refs_d9.csv")

                                                                                

+--------+--------------------+
|paper_id|             ref_ids|
+--------+--------------------+
|      65|                    |
|     130|                    |
|     195|317424;317425;317573|
|     260|                    |
|     325|                    |
|     390|                    |
|     455|                    |
|     520|       318368;323493|
|     585|                    |
|     650|                    |
|     715|                    |
|     780|318420;319233;319...|
|     845|                    |
|     910|                    |
|     975|67604;318882;3718...|
|    1040|                    |
|    1105|289087;318014;318...|
|    1170|                    |
|    1235|                    |
|    1300|                    |
+--------+--------------------+
only showing top 20 rows

None


                                                                                

In [15]:
## Authors
authorTextLines = zipDatasetWithIndex(AUTHOR_DATASET_PATH)

                                                                                

In [16]:
summedArrayOfAuthorIndexRows = doCumSumForIndexRows(authorTextLines)

                                                                                

In [17]:
authorItemTuples = convertDataIntoIndexedTuples(authorTextLines, summedArrayOfAuthorIndexRows)

In [18]:
print(authorItemTuples.take(10))

[(1, '#index 1'), (1, '#n O. Willum'), (1, '#a Res. Center for Microperipherik, Technische Univ. Berlin, Germany'), (1, '#pc 1'), (1, '#cn 0'), (1, '#hi 0'), (1, '#pi 0.0000'), (1, '#upi 0.0000'), (1, '#t new product;product group;active product;long product lifetime;old product;product generation;new technology;environmental benefit;environmental choice;environmental consequence'), (1, '')]


In [19]:
authorDictsArray = convertDataIntoDicts(authorItemTuples, AUTHOR_KEYS_MAP, reduceAuthorFeaturesToDict)



(1712433, {'authors_data': {'author_id': '1712433', 'name': 'Andrea Gantchev', 'paper_count': '2', 'h_index': '1'}, 'research_interests_data': {'author_id': '1712433', 'research_interests': 'subsumption architecture;Subsumption ArchitectureThe subsumption architecture;software architecture;subsumption architectureReusable Strategies;Object-oriented design;object-oriented software design;Rodney Brooks;Software Agents;behaviour-based control;different micro-strategies'}})


                                                                                

In [21]:
authors_d = authorDictsArray.map(lambda x: x["authors_data"])
research_interests_d = authorDictsArray.map(lambda x: x["research_interests_data"])

In [22]:
convertDictsArrayIntoCSVFile(authors_d, "./assets/authors_d8.csv")
convertDictsArrayIntoCSVFile(research_interests_d, "./assets/research_interests_d8.csv")



+---------+-------+--------------------+-----------+
|author_id|h_index|                name|paper_count|
+---------+-------+--------------------+-----------+
|       17|      0|     J. Michael Howe|          1|
|       34|      0|        Haitham Gabr|          2|
|       51|      1|         Emma Tonkin|          8|
|       68|      1|        Woochul Shin|          4|
|       85|      0|           S Improta|          1|
|      102|      2|       Richard Ferri|          5|
|      119|      0|            Qing Liu|          1|
|      136|      0|      Artur Gramacki|          2|
|      153|      0|Olumuyiwa Oluwasanmi|          2|
|      170|      0|    Josef Willenborg|          1|
|      187|      0|            Qing Wei|          1|
|      204|      1|Jurey Ivanovich Z...|          1|
|      221|      1|             Anny Ng|          1|
|      238|      1|    Nikos B. Pronios|          3|
|      255|      0| Lourdes Fraga Alman|          1|
|      272|      1|       Junji Nishino|      

                                                                                

+---------+--------------------+
|author_id|  research_interests|
+---------+--------------------+
|       17|HIV disease;Inter...|
|       34|associate polynom...|
|       51|metadata element;...|
|       68|Web Service;conte...|
|       85|intermediate key;...|
|      102|feedback loop;dif...|
|      119|Rough Set;nomal C...|
|      136|MATLAB toolbox;li...|
|      153|Byzantine agreeme...|
|      170|Ein objektorienti...|
|      187|portable device;A...|
|      204|Integer-valued pr...|
|      221|stock price;stock...|
|      238|Hypermedia Synchr...|
|      255|computer-mediated...|
|      272|Dijkstra method;o...|
|      289|low-frequency act...|
|      306|copyright process...|
|      323|uncertain informa...|
|      340|histology image;s...|
+---------+--------------------+
only showing top 20 rows

None


                                                                                