In [None]:
# Dependencies
import pandas as pd
from typing import Dict, List, Tuple, Sequence
from faker import Faker
import gremlin_ingest.tracking as gt
import gremlin_ingest.crud as gc
from gremlin_python.process.graph_traversal import GraphTraversalSource
from gremlin_python.process.graph_traversal import GraphTraversal
from datetime import datetime

In [None]:
#Config variables

#Source tags to be added  as properties in vertex/edge documents
## We don't really need these in a small ingest with just one source - 
## however, some tracking would be nice in a scaled graph system with multiple sources and ingest pipelines
## This isn't perfect by any means -- just an example
def add_ingest_tags(properties:Dict):
    properties["SourceDataTag"] = "stackoverflow:stacksample:2.0"
    properties["SourceDataUrl"] = "https://www.kaggle.com/stackoverflow/stacksample"
    properties["SourceAnalysisTag"] = "janusgraph:stackoverflow-ingestion:0.1"
    properties["SourceAnalysisUrl"] = "https://github.com/zblumen/janusgraph-stackoverflow-ingestion/blob/master/prep-stackoverflow-data.ipynb"
    
fake = Faker()

### Original Data Source
[https://www.kaggle.com/stackoverflow/stacksample](https://www.kaggle.com/stackoverflow/stacksample)
### Current Tabular Schemas

#### Question Table
* **Id (PK)**: Question id
* **OwnerUserId**: Id of user that posted the question
* **CreationDate**: Date of creation
* **CloseDate**: Date question was closed (if applicable)
* **Score**: Question Score
* **Title**: Question title text
* **Body**: Question body free text

#### Answer Table
* **Id (PK)**: Answer id
* **OwnerUserId**: Id of user that posted the answer
* **CreationDate**: Date of creation
* **ParentId (FK)**: Id of the question this answer was posted for
* **Score**: Answer Score
* **Body**: Answer body free text

#### Tag Table
* **Id (FK)**: Question id
* **Tag**: Tag text string

### Target Vertex + Edge Schemas

#### User Vertex
* **UserId**
* **UserName** (Will be faker for now)

#### Question Vertex
* **Id (PK)**: Question id
* **CreationDate**: Date of creation
* **CloseDate**: Date question was closed (if applicable)
* **Score**: Question Score
* **Title**: Question title text
* **Body**: Question body free text
* **TagList**: List of tags

#### Answer Vertex
* **Id (PK)**: Answer id
* **CreationDate**: Date of creation
* **Score**: Answer Score
* **Body**: Answer body free text

#### UserPostsQuestion Edge
(no properties)

#### UserPostsAnswer Edge
(no properties)

#### AnswerIsForQuestion Edge
(no properties)

In [None]:
#Instantiate Vertex Objects to be used in ingestion
vertexIngestDict  = {}

vertexIngestDict["User"] = pd.DataFrame(columns=[
    "VertexLabel",
    "SourceId", 
    "UserName",
    "VertexCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "VertexLabel": "object",
    "SourceId": "object", 
    "UserName": "object",  
    "VertexCreationDate": "object", 
    "SourceDataTag": "object", 
    "SourceDataUrl": "object", 
    "SourceAnalysisTag": "object", 
    "SourceAnalysisUrl": "object"
    }
)

vertexIngestDict["Question"] = pd.DataFrame(columns=[
    "VertexLabel",
    "SourceId", 
    "CreationDateTime",
    "CloseDateTime",
    "PostScore",
    "QuestionTitle",
    "PostBody",
    "VertexCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "VertexLabel": "object",
    "SourceId": "object", 
    "CreationDateTime": "datetime64",
    "CloseDateTime": "datetime64",
    "PostScore": "int64",
    "QuestionTitle": "object",
    "PostBody": "object",
    "VertexCreationDate": "object",
    "SourceDataTag": "object",
    "SourceDataUrl": "object",
    "SourceAnalysisTag": "object",
    "SourceAnalysisUrl": "object"
    }
)

vertexIngestDict["Answer"] = pd.DataFrame(columns=[
    "VertexLabel",
    "SourceId", 
    "CreationDateTime",
    "PostScore",
    "PostBody",
    "VertexCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "VertexLabel": "object",
    "SourceId": "object", 
    "CreationDateTime": "datetime64",
    "PostScore": "int64",
    "PostBody": "object",
    "VertexCreationDate": "object",
    "SourceDataTag": "object",
    "SourceDataUrl": "object",
    "SourceAnalysisTag": "object",
    "SourceAnalysisUrl": "object"
    }
)

In [None]:
#Instantiate Edge Objects to be used in ingestion
edgeIngestDict  = {}

edgeIngestDict["UserPostsQuestion"] = pd.DataFrame(columns=[
    "EdgeLabel",
    "FromId", 
    "ToId",
    "EdgeCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "EdgeLabel": "object",
    "FromSourceId": "object", 
    "ToSourceId": "object", 
    "EdgeCreationDate": "object", 
    "SourceDataTag": "object", 
    "SourceDataUrl": "object", 
    "SourceAnalysisTag": "object", 
    "SourceAnalysisUrl": "object"
    }
)

edgeIngestDict["UserPostsAnswer"] = pd.DataFrame(columns=[
    "EdgeLabel",
    "FromId", 
    "ToId",
    "EdgeCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "EdgeLabel": "object",
    "FromId": "object", 
    "ToId": "object", 
    "EdgeCreationDate": "object", 
    "SourceDataTag": "object", 
    "SourceDataUrl": "object", 
    "SourceAnalysisTag": "object", 
    "SourceAnalysisUrl": "object"
    }
)

edgeIngestDict["AnswerIsForQuestion"] = pd.DataFrame(columns=[
    "EdgeLabel",
    "FromId", 
    "ToId",
    "EdgeCreationDate",
    "SourceDataTag",
    "SourceDataUrl",
    "SourceAnalysisTag",
    "SourceAnalysisUrl"
]).astype(
    {
    "EdgeLabel": "object",
    "FromId": "object", 
    "ToId": "object", 
    "EdgeCreationDate": "object", 
    "SourceDataTag": "object", 
    "SourceDataUrl": "object", 
    "SourceAnalysisTag": "object", 
    "SourceAnalysisUrl": "object"
    }
)

In [None]:
#Read Questions
questionsTable = pd.read_csv("./data/Questions.csv",encoding = "ISO-8859-1");
questionsTable

In [None]:
## Instantiate ingest tracker
vertexTrackingSchema = gt.GraphStagingSchema({"GremlinId":gt.GraphPrepDataTypeEnum.OBJECT})
edgeTrackingSchema = gt.GraphStagingSchema({
    "ToGremlinId": gt.GraphPrepDataTypeEnum.OBJECT,
    "FromGremlinId": gt.GraphPrepDataTypeEnum.OBJECT,
    "EdgeLabel": gt.GraphPrepDataTypeEnum.OBJECT
})
ingestTracker = gt.GraphIngestTracker(vertexTrackingSchema, edgeTrackingSchema)

## Define functions for creating tracking_ids (expected by GraphIngestTracker methods)
def make_edge_tracking_id(prefix:str, source_vertex_id: str, target_vertex_id) -> str:
    return prefix + '-' + source_vertex_id + '-' + target_vertex_id

def make_vertex_tracking_id(prefix:str, vertex_id: str) -> str:
    return prefix + '-' + vertex_id

def insert_user_vertex(        
        g: GraphTraversalSource,
        ingest_tracker: gt.GraphIngestTracker, 
        tracking_id: str,
        row: pd.Series   
        ) -> int:
    properties = {
        "VertexLabel": "User",
        "SourceId": row["OwnerUserId"], 
        "UserName": fake.name() + " Fakename",
        "VertexCreationDate": datetime.now()
    }
    add_ingest_tags(properties)
    gremlin_id = gc.insert_vertex(g,"User",properties)
    ingest_tracker.insert_vertex_tracking(tracking_id,{"GremlinId":gremlin_id})
    return gremlin_id

def insert_question_vertex(        
        g: GraphTraversalSource,
        ingest_tracker: gt.GraphIngestTracker, 
        tracking_id: str,
        row: pd.Series   
        ) -> int:
    properties = {
        "VertexLabel": "Question",
        "SourceId": row["Id"],
        "CreationDateTime": row["CreationDate"],
        "CloseDateTime": row["CloseDate"],
        "PostScore": row["Score"],
        "QuestionTitle": row["Title"],
        "PostBody": row["Body"],
        "VertexCreationDate": datetime.now()
    }
    add_ingest_tags(properties)
    gremlin_id = gc.insert_vertex(g,"Question",properties)
    ingest_tracker.insert_vertex_tracking(tracking_id,{"GremlinId":gremlin_id})
    return gremlin_id

def insert_answer_vertex(        
        g: GraphTraversalSource,
        row: pd.Series    
        ) -> int:
    properties = {
        "VertexLabel": "Answer",
        "SourceId": row["Id"],
        "CreationDateTime": row["CreationDate"],
        "PostScore": row["Score"],
        "PostBody": row["Body"],
        "VertexCreationDate": datetime.now()
    }
    add_ingest_tags(properties)
    return gc.insert_vertex(g,"Answer",properties)

def ingest_stackoverflow_question(
        g: GraphTraversalSource,
        ingest_tracker: gt.GraphIngestTracker, 
        row: pd.Series     
    ):
    #check if user has already been ingested - and potentially ingest
    user_tracking_id = make_vertex_tracking_id("user",row["OwnerUserId"])
    user_gremlin_id:int
    if not ingest_tracker.vertex_exists(user_tracking_id):
        #insert user vertex
        user_gremlin_id = insert_user_vertex(g,ingest_tracker,user_tracking_id,row)
    else:
        #get gremlin id from tracker
        user_gremlin_id = ingest_tracker.get_vertex(user_tracking_id)['GremlinId']
        
    #insert question
    question_tracking_id = make_vertex_tracking_id("question",row["OwnerUserId"])
    question_gremlin_id = insert_question_vertex(g,ingest_tracker,question_tracking_id, row)

    #track user->question edge (will insert later)
    edge_tracking_id = make_edge_tracking_id("uq",user_tracking_id,question_tracking_id)
    ingest_tracker.insert_edge_tracking(
        edge_tracking_id,
        {
            "FromGremlinId": user_gremlin_id,
            "ToGremlinId": question_gremlin_id,
            "EdgeLabel": "UserPostsQuestion"
        })
    
    #check if there is an answer->question edge waiting for inEdgeIGremlinId
    
def ingest_stackoverflow_answer(
        g: GraphTraversalSource,
        ingest_tracker: gt.GraphIngestTracker, 
        row: pd.Series     
    ):
    #check if user has already been ingested - and potentially ingest
    user_tracking_id = make_vertex_tracking_id("user",row["OwnerUserId"])
    user_gremlin_id:int
    if not ingest_tracker.vertex_exists(user_tracking_id):
        #insert user vertex
        user_gremlin_id = insert_user_vertex(g,ingest_tracker,user_tracking_id,row)
    else:
        #get gremlin id from tracker
        user_gremlin_id = ingest_tracker.get_vertex(user_tracking_id)['GremlinId']
        
    #insert answer
    answer_tracking_id = make_vertex_tracking_id("answer",row["OwnerUserId"])
    answer_gremlin_id = insert_answer_vertex(g,row)

    #track user->answer edge (will insert later)
    user_edge_tracking_id = make_edge_tracking_id("ua",user_tracking_id,answer_tracking_id)
    ingest_tracker.insert_edge_tracking(
        user_edge_tracking_id,
        {
            "FromGremlinId": user_gremlin_id,
            "ToGremlinId": answer_gremlin_id,
            "EdgeLabel": "UserPostsAnswer"
        })
    #generate answer->question edge
    #check if question has been ingested already
    
    

outEdgeStagingDf = pd.DataFrame(columns=[ #index with EdgeTrackingId
    "FromGremlinId",
    "EdgeLabel"
]).astype(
    {
        "FromGremlinId": "object",
        "EdgeLabel": "object"
    }
)

inEdgeStagingDf = pd.DataFrame(columns=[ #index with EdgeTrackingId
    "ToGremlinId"
]).astype(
    {
        "ToGremlinId": "object"
    }
)

## Create user tracking dataframe
UserTrackingDf = pd.DataFrame(columns=[ #index with VertexTrackingId
    "gremlinId"
]).astype(
    {
        "gremlinId": "object"
    }
)

## Define insert and write to edge df methods
def make_edge_tracking_id(prefix:str, source_vertex_id: str, target_vertex_id) -> str:
    return prefix + '-' + source_vertex_id + '-' + target_vertex_id

def make_vertex_id(prefix:str, vertex_id: str) -> str:
    return prefix + '-' + vertex_id





## re-index edge dfs and join them

## insert edges

