Skip to content

Commit

Permalink
Add Request Passthrough to continuous webjobs
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmelsayed committed Aug 29, 2015
1 parent cab4bfd commit 3e9c208
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 4 deletions.
7 changes: 6 additions & 1 deletion Kudu.Contracts/Jobs/IContinuousJobsManager.cs
@@ -1,9 +1,14 @@
namespace Kudu.Contracts.Jobs
using System.Net.Http;
using System.Threading.Tasks;

namespace Kudu.Contracts.Jobs
{
public interface IContinuousJobsManager : IJobsManager<ContinuousJob>
{
void DisableJob(string jobName);

void EnableJob(string jobName);

Task<HttpResponseMessage> HandleRequest(string jobName, string path, HttpRequestMessage request);
}
}
2 changes: 1 addition & 1 deletion Kudu.Contracts/Jobs/JobSettings.cs
Expand Up @@ -31,7 +31,6 @@ public bool IsSingleton
return GetSetting(JobSettingsKeys.IsSingleton, false);
}
}

public TimeSpan GetStoppingWaitTime(long defaultTime)
{
return TimeSpan.FromSeconds(GetSetting(JobSettingsKeys.StoppingWaitTime, defaultTime));
Expand All @@ -46,5 +45,6 @@ public string GetSchedule()
{
return GetSetting<string>(JobSettingsKeys.Schedule);
}

}
}
40 changes: 40 additions & 0 deletions Kudu.Core.Test/Jobs/ContinuousJobsManagerFacts.cs
@@ -0,0 +1,40 @@
using Kudu.Core.Jobs;
using Kudu.Core.Tracing;
using Kudu.TestHarness;
using Moq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Xunit;

namespace Kudu.Core.Test.Jobs
{
public class ContinuousJobsManagerFacts
{
[Theory]
[InlineData(4567, "path/to/webjob/function", @"{""key"": ""value""}")]
public void CloneRequestTestCase(int port, string path, string content)
{
// Arrange
var headers = new Dictionary<string, string> { { "key", "value" }, { "HOST", "example.com" } };
var request = new HttpRequestMessage(HttpMethod.Put, new Uri("https://kudu.com/" + path));
request.Content = new StringContent(content);
foreach (var pair in headers)
request.Headers.Add(pair.Key, pair.Value);

// Act
var clone = ContinuousJobsManager.GetForwardRequest(port, path, request);

// Assert
Assert.Equal(request.Method, clone.Method);
Assert.Equal(string.Format("http://127.0.0.1:{0}/{1}", port, path), clone.RequestUri.AbsoluteUri);
Assert.Equal(content, clone.Content.ReadAsStringAsync().Result);
Assert.Equal(1, clone.Headers.Count());
Assert.Equal("value", clone.Headers.First().Value.First());
}
}
}
1 change: 1 addition & 0 deletions Kudu.Core.Test/Kudu.Core.Test.csproj
Expand Up @@ -99,6 +99,7 @@
<Compile Include="Infrastructure\XmlUtilityFacts.cs" />
<Compile Include="DeploymentSettingsFacts.cs" />
<Compile Include="Jobs\ContinuousJobRunnerFacts.cs" />
<Compile Include="Jobs\ContinuousJobsManagerFacts.cs" />
<Compile Include="Jobs\ScheduleFacts.cs">
<SubType>Code</SubType>
</Compile>
Expand Down
1 change: 1 addition & 0 deletions Kudu.Core/Deployment/WellKnownEnvironmentVariables.cs
Expand Up @@ -29,6 +29,7 @@ internal static class WellKnownEnvironmentVariables
public const string WebJobsRunId = "WEBJOBS_RUN_ID";
public const string WebJobsShutdownNotificationFile = "WEBJOBS_SHUTDOWN_FILE";
public const string WebJobsCommandArguments = "WEBJOBS_COMMAND_ARGUMENTS";
public const string WebJobsPort = "WEBJOBS_PORT";

public const string CommitId = "SCM_COMMIT_ID";
public const string DnvmPath = "SCM_DNVM_PS_PATH";
Expand Down
6 changes: 5 additions & 1 deletion Kudu.Core/Jobs/BaseJobRunner.cs
Expand Up @@ -181,7 +181,7 @@ protected void InitializeJobInstance(JobBase job, IJobLogger logger)
}
}

protected void RunJobInstance(JobBase job, IJobLogger logger, string runId, string trigger)
protected void RunJobInstance(JobBase job, IJobLogger logger, string runId, string trigger, int port = -1)
{
string scriptFileName = Path.GetFileName(job.ScriptFilePath);
string scriptFileFullPath = Path.Combine(WorkingDirectory, job.RunCommand);
Expand All @@ -205,6 +205,10 @@ protected void RunJobInstance(JobBase job, IJobLogger logger, string runId, stri
exe.EnvironmentVariables[WellKnownEnvironmentVariables.WebJobsDataPath] = JobDataPath;
exe.EnvironmentVariables[WellKnownEnvironmentVariables.WebJobsRunId] = runId;
exe.EnvironmentVariables[WellKnownEnvironmentVariables.WebJobsCommandArguments] = job.CommandArguments;
if (port != -1)
{
exe.EnvironmentVariables[WellKnownEnvironmentVariables.WebJobsPort] = port.ToString();
}

if (_shutdownNotificationFilePath != null)
{
Expand Down
21 changes: 20 additions & 1 deletion Kudu.Core/Jobs/ContinuousJobRunner.cs
Expand Up @@ -6,6 +6,8 @@
using Kudu.Contracts.Settings;
using Kudu.Core.Infrastructure;
using Kudu.Core.Tracing;
using System.Net.Sockets;
using System.Net;

namespace Kudu.Core.Jobs
{
Expand Down Expand Up @@ -54,6 +56,8 @@ protected override TimeSpan IdleTimeout
get { return TimeSpan.MaxValue; }
}

internal int WebJobPort { get; private set; }

private void StartJob(ContinuousJob continuousJob)
{
// Do not go further if already started or job is disabled
Expand Down Expand Up @@ -93,7 +97,8 @@ private void StartJob(ContinuousJob continuousJob)
using (new Timer(LogStillRunning, null, TimeSpan.FromHours(1), TimeSpan.FromHours(12)))
{
InitializeJobInstance(continuousJob, _continuousJobLogger);
RunJobInstance(continuousJob, _continuousJobLogger, String.Empty, String.Empty);
WebJobPort = GetAvailableJobPort();
RunJobInstance(continuousJob, _continuousJobLogger, String.Empty, String.Empty, WebJobPort);
}
if (_started == 1 && !IsDisabled)
Expand Down Expand Up @@ -297,5 +302,19 @@ protected virtual void Dispose(bool disposing)
}
}
}

private int GetAvailableJobPort()
{
var listener = new TcpListener(IPAddress.Loopback, 0);
try
{
listener.Start();
return ((IPEndPoint)listener.LocalEndpoint).Port;
}
finally
{
listener.Stop();
}
}
}
}
38 changes: 38 additions & 0 deletions Kudu.Core/Jobs/ContinuousJobsManager.cs
Expand Up @@ -2,17 +2,22 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using Kudu.Contracts.Jobs;
using Kudu.Contracts.Settings;
using Kudu.Core.Infrastructure;
using Kudu.Core.Tracing;
using System.Net;
using System.Threading.Tasks;
using Kudu.Contracts.Tracing;

namespace Kudu.Core.Jobs
{
public class ContinuousJobsManager : JobsManagerBase<ContinuousJob>, IContinuousJobsManager, IDisposable
{
private const string StatusFilesSearchPattern = ContinuousJobStatus.FileNamePrefix + "*";
private const string Localhost = "127.0.0.1";

private readonly Dictionary<string, ContinuousJobRunner> _continuousJobRunners = new Dictionary<string, ContinuousJobRunner>(StringComparer.OrdinalIgnoreCase);

Expand Down Expand Up @@ -57,6 +62,39 @@ public void EnableJob(string jobName)
continuousJobRunner.EnableJob();
}

public async Task<HttpResponseMessage> HandleRequest(string jobName, string path, HttpRequestMessage request)
{
using (var client = new HttpClient())
{
var jobRunner = GetJobRunner(jobName);
var forwardRequest = GetForwardRequest(jobRunner.WebJobPort, path, request);
return await client.SendAsync(forwardRequest);
}
}

public static HttpRequestMessage GetForwardRequest(int port, string path, HttpRequestMessage original)
{
var webJobBaseUri = new Uri(string.Concat("http://", Localhost, ":", port));
var fullUriPath = new Uri(webJobBaseUri, path);
var clone = new HttpRequestMessage(original.Method, fullUriPath);

if (original.Method != HttpMethod.Get)
{
clone.Content = original.Content;
}

original.Headers.Aggregate(clone.Headers, (a, b) =>
{
if (!b.Key.Equals("HOST", StringComparison.OrdinalIgnoreCase))
{
a.Add(b.Key, b.Value);
}
return a;
});

return clone;
}

private ContinuousJobRunner GetJobRunner(string jobName)
{
ContinuousJobRunner continuousJobRunner;
Expand Down
1 change: 1 addition & 0 deletions Kudu.Services.Web/App_Start/NinjectServices.cs
Expand Up @@ -483,6 +483,7 @@ public static void RegisterRoutes(IKernel kernel, RouteCollection routes)
routes.MapHttpWebJobsRoute("remove-continuous-job", "continuous", "/{jobName}", new { controller = "Jobs", action = "RemoveContinuousJob" }, new { verb = new HttpMethodConstraint("DELETE") });
routes.MapHttpWebJobsRoute("get-continuous-job-settings", "continuous", "/{jobName}/settings", new { controller = "Jobs", action = "GetContinuousJobSettings" }, new { verb = new HttpMethodConstraint("GET") });
routes.MapHttpWebJobsRoute("set-continuous-job-settings", "continuous", "/{jobName}/settings", new { controller = "Jobs", action = "SetContinuousJobSettings" }, new { verb = new HttpMethodConstraint("PUT") });
routes.MapHttpWebJobsRoute("request-passthrough-continuous-job", "continuous", "/{jobName}/passthrough/{*path}", new { controller = "Jobs", action = "RequestPassthrough" }, new { verb = new HttpMethodConstraint("GET", "HEAD", "PUT", "POST", "DELETE", "PATCH") });

// Web Jobs as microservice
routes.MapHttpRoute("list-triggered-jobs-swagger", "api/triggeredwebjobsswagger", new { controller = "Jobs", action = "ListTriggeredJobsInSwaggerFormat" }, new { verb = new HttpMethodConstraint("GET") });
Expand Down
14 changes: 14 additions & 0 deletions Kudu.Services/Jobs/JobsController.cs
Expand Up @@ -253,6 +253,20 @@ public HttpResponseMessage SetTriggeredJobSettings(string jobName, JobSettings j
return SetJobSettings(jobName, jobSettings, _triggeredJobsManager);
}

[AcceptVerbs("GET", "HEAD", "PUT", "POST", "DELETE", "PATCH")]
public async Task<HttpResponseMessage> RequestPassthrough(string jobName, string path)
{
try
{
return await _continuousJobsManager.HandleRequest(jobName, path, Request);
}
catch(Exception e)
{
_tracer.TraceError(e);
return Request.CreateErrorResponse(HttpStatusCode.NotFound, e);
}
}

private HttpResponseMessage ListJobsResponseBasedOnETag(IEnumerable<JobBase> jobs)
{
string etag = GetRequestETag();
Expand Down

0 comments on commit 3e9c208

Please sign in to comment.